javascript - रुकने योग्य RxJS स्ट्रीम




angular (4)

आप एक सरणी का उपयोग करके एक कस्टम बफर को लागू कर सकते हैं। जब bufferToggle उत्सर्जन करता है तो उन मानों को आपके कस्टम बफर में जोड़ देता है। तब कस्टम बफ़र से मान लें जब तक बफ़र में एक निश्चित तत्व रुकने की स्थिति से मेल नहीं खाता। उन मूल्यों का अनुकरण करें और अपनी धारा को विराम दें।

ngOnInit(): void {
  this.active$ = new BehaviorSubject<boolean>(true);
  const on$ = this.active$.pipe(filter(v => v));
  const off$ = this.active$.pipe(filter(v => !v));
  timer(500, 500).pipe(
    share(),
    pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false), false)
  ).subscribe(console.log);
}

pauseOn(value: number): boolean { return value % 10 === 0; }

export function pausable<T, O>(
  on$: Observable<any>,
  off$: Observable<O>,
  haltCondition: (value: T) => boolean,
  pause: () => void,
  spread: boolean) {
  return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
    let buffer: T[] = [];
    return merge(
      source.pipe(
        bufferToggle(off$, () => on$),
        tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
        map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
        tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
        map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
        mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
      ),
      source.pipe(
        windowToggle(on$, () => off$),
        mergeMap(x => x),
        tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
      ),
    );
  });
}

pausable ऑपरेटर वर्तमान में उन मानों का उत्सर्जन करेंगे जो रुकावट की स्थिति से मेल खाते हैं और फिर बाद में सीधे धारा को रोक देते हैं। यह आपकी विशिष्ट आवश्यकताओं के लिए समायोजित किया जा सकता है जैसे कम इनपुट मापदंडों के साथ सरलीकृत या share को pausable में शामिल किया जा सकता है।

यहाँ एक स्टैकब्लिट्ज़ है: https://stackblitz.com/edit/angular-rbv9by

मैंने इस उदाहरण में बफ़र सरणी नहीं फैलाई ताकि आप आसानी से देख सकें कि बफ़र से कौन से मान आते हैं और कौन से नहीं।

मूल

मान जाँचने के लिए बफर से पहले अपनी स्ट्रीम में कहीं न कहीं tap उपयोग tap और कुछ स्थिति के आधार पर स्ट्रीम को रोकें।

this.stream$.pipe(
  tap(value => this.active$.value && value % 5 === 0 ? this.active$.next(false) : null),
  bufferToggle(off$, () => on$),
  mergeAll(),
)

मेरे पास एक एकल बटन के साथ एक सरल घटक है जो आरएक्सजेएस टाइमर द्वारा उत्पन्न संख्याओं की धारा को शुरू और रोक देता है।

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}

यह पूरी तरह से काम करता है।

मुझे एक और फीचर जोड़ने की जरूरत है। मुझे स्ट्रीम में मान के आधार पर स्वचालित रूप से स्थिति का उपयोग करके स्ट्रीम ( this.active$.next(false) ) को रोकने की आवश्यकता है। उदाहरण के लिए, स्ट्रीम को रोकें यदि नवीनतम मान 5 से अधिक था।

क्या आपके पास इस बारे में कोई विचार है कि यह कैसे करना है?

यहाँ स्टैकब्लिट्ज़ https://stackblitz.com/edit/angular-6hjznn पर एक रननीय उदाहरण दिया गया है


के रूप में सरल के रूप में यह एक windowToggle साथ मिल सकता है windowToggle और windowToggle का उपयोग करें। एक्सटेंस (झूठा) काम करने का उदाहरण: https://stackblitz.com/edit/angular-pdw7kw

 defer(() => {
      let count = 0;
      return stream$.pipe(
        windowToggle(on$, () => off$),
        exhaustMap(obs => obs),
        mergeMap(_ => {
          if ((++count) % 5 === 0) {
            this.active$.next(false)
            return never()
          }
          return of(count)
        }),
      )
    }).subscribe(console.log)

यहाँ यह करने के लिए एक सरल तरीका है। timer() उपयोग करें timer() केवल एक एमिटर के रूप में, और एक गणना को अलग से बढ़ाएं। यह आपको थोड़ा और प्रत्यक्ष नियंत्रण देता है।

export class AppComponent implements OnInit {
  active = true;
  out$: Observable<number>;

  count = 0;

  ngOnInit(): void {

    const stream$ = timer(500, 500);

    this.out$ = stream$.pipe(
      filter(v => this.active),
      map(v => {
        this.count += 1;
        return this.count;
      }),
      tap(v => {
        if (this.count % 5 === 0) {
          this.active = false;
        }
      })
    )
  }

}

https://stackblitz.com/edit/angular-nzs7zh


यहां एक कस्टम पॉज़ ऑपरेटर है जो पॉज़ सिग्नल के true होने पर बस एक बफर में मान जमा करेगा, और जब यह false हो तो एक-एक करके उनका उत्सर्जन करें।

जब मान किसी विशिष्ट स्थिति से टकराता है, तो व्यवहार विषय पॉज़ सिग्नल को टॉगल करने के लिए एक साधारण tap ऑपरेटर के साथ इसे मिलाएं, और आपके पास बटन पर क्लिक करने पर कुछ रुक जाएगा और जब मान एक शर्त से मिलता है (इस मामले में 12 से अधिक):

यहाँ pause ऑपरेटर है:

function pause<T>(pauseSignal: Observable<boolean>) {
  return (source: Observable<T>) => Observable.create(observer => {
    const buffer = [];
    let paused = false;
    let error;
    let isComplete = false;

    function notify() {
      while (!paused && buffer.length) {
        const value = buffer.shift();
        observer.next(value);
      }

      if (!buffer.length && error) {
        observer.error(error);
      }

      if (!buffer.length && isComplete) {
        observer.complete();
      }
    }

    const subscription = pauseSignal.subscribe(
      p => {
        paused = !p;
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {});

    subscription.add(source.subscribe(
      v => {
        buffer.push(v);
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {
        isComplete = true;
        notify();
      }
    ));

    return subscription;
  });
}

यहाँ इसका उपयोग है:

const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
  pause(this.active$),
  tap(value => {
    if (CONDITION(value)) {
      this.active$.next(false);
    }
  }));

this.d = out$.subscribe(v => console.log(v));

और एक कामकाजी उदाहरण: https://stackblitz.com/edit/angular-bvxnbf





rxjs