пример - rxjava flatmap




Задержка RxJava для каждого отправляемого элемента списка (10)

Вы должны быть в состоянии достичь этого с помощью оператора Timer . Я пытался с delay но не смог достичь желаемого результата. Обратите внимание на вложенные операции, выполняемые в операторе flatmap .

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));

Я изо всех сил пытаюсь реализовать то, что, как я предполагал, будет довольно простым в Rx.

У меня есть список предметов, и я хочу, чтобы каждый предмет выдавался с задержкой.

Кажется, что оператор Rx delay () просто смещает излучение всех элементов на указанную задержку, а не каждого отдельного элемента.

Вот некоторый тестовый код. Он группирует элементы в списке. Каждая группа должна затем применить задержку перед отправкой.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

Результат:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

Но то, что я ожидал увидеть, выглядит примерно так:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

Что я делаю неправильно?


Вы можете добавить задержку между испускаемыми элементами, используя flatMap, maxConcurrent и delay ()

Вот пример - выброс с задержкой 0..4

@Test
fun testEmitWithDelays() {
    val DELAY = 500L
    val COUNT = 5

    val latch = CountDownLatch(1)
    val startMoment = System.currentTimeMillis()
    var endMoment : Long = 0

    Observable
        .range(0, COUNT)
        .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
        .subscribe(
                { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                {},
                {
                    endMoment = System.currentTimeMillis()
                    latch.countDown()
                })

    latch.await()

    assertTrue { endMoment - startMoment >= DELAY * COUNT }
}

... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

Для пользователей kotlin я написал функцию расширения для подхода «почтовый индекс с интервалом»

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

Это работает так же, но это делает его многоразовым. Пример:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

Есть другой способ сделать это, используя concatMap, поскольку concatMap возвращает наблюдаемые исходные элементы. так что мы можем добавить задержку на это наблюдаемое.

вот что я попробовал.

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

Один из способов сделать это - использовать zip для объединения вашей наблюдаемой с наблюдаемым Interval чтобы задержать вывод.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

Просто поделитесь простым подходом к отправке каждого элемента в коллекции с интервалом:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

Каждый элемент будет излучаться каждые 500 миллисекунд


Ты можешь использовать

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

Этот код выше не дублирует значение (индекс). "Я уверен"


Чтобы ввести задержку между каждым испускаемым элементом, полезно:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

Больше хороших вариантов на https://github.com/ReactiveX/RxJava/issues/3505


Я думаю, что вы хотите это:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

Таким образом, это приведет к задержке чисел, входящих в группу, а не к сокращению списка на 5 секунд.


Я думаю, что это именно то, что вам нужно. Взгляни:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });






delay