RxJava تأخير لكل عنصر من عناصر القائمة المنبعثة




rx-java delay (10)

أعتقد أنك تريد هذا:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

بهذه الطريقة ستؤخر الأرقام التي تدخل المجموعة بدلاً من تأخير القائمة المخفّضة بمقدار 5 ثوانٍ.

أجد صعوبة في تنفيذ شيء افترضت أنه سيكون بسيطًا جدًا في آر إكس.

لدي قائمة بالعناصر ، وأريد إرسال كل عنصر مع تأخير.

يبدو أن مشغل Rx delay () يحول مجرد انبعاث كل العناصر بالتأخير المحدد ، وليس كل عنصر على حدة.

وإليك بعض رمز الاختبار. يقوم بتجميع العناصر في قائمة. يجب على كل مجموعة أن تطبق تأخيرًا قبل إرسالها.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

النتيجه هي:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

لكن ما أتوقع أن أراه هو شيء مثل هذا:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

ما الخطأ الذي افعله؟


أعتقد أنه بالضبط ما تحتاجه. ألقِ نظرة:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });

طريقة واحدة للقيام بذلك هي استخدام zip لدمج الخاص بك يمكن ملاحظتها مع Interval يمكن ملاحظتها لتأخير الإخراج.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

لإدخال التأخير بين كل عنصر ينبعث منه مفيد:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

المزيد من الخيارات الجيدة على https://github.com/ReactiveX/RxJava/issues/3505


لمستخدمي kotlin ، كتبت وظيفة ملحق لنهج "zip with interval"

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

إنها تعمل بنفس الطريقة ، لكن هذا يجعلها قابلة لإعادة الاستخدام. مثال:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

مجرد مشاركة طريقة بسيطة لإصدار كل عنصر في مجموعة بفاصل زمني:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

سيتم إصدار كل عنصر في كل 500 مللي ثانية


يبدو أن أبسط طريقة للقيام بذلك هي مجرد استخدام concatMap ولف كل عنصر في "ملاحظة قابلة للتأخير".

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

مطبوعات:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

يجب أن تكون قادرًا على تحقيق ذلك باستخدام مشغل Timer . حاولت delay لكن لم أستطع تحقيق المخرجات المطلوبة. لاحظ العمليات المتداخلة التي تتم في مشغل flatmap .

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));

يمكنك استخدام

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

هذا الكود أعلاه ليس قيمة مكررة (فهرس). "انا متاكد"


يمكنك تطبيق مشغل rx مخصص مثل MinRegularIntervalDelayOperator ثم استخدمه مع وظيفة lift

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();






delay