javascript रुकने योग्य RxJS स्ट्रीम




angular (4)

के रूप में सरल के रूप में यह एक windowToggle साथ मिल सकता है windowToggle और windowToggle का उपयोग करें। एक्सटेंस (झूठा) काम करने का उदाहरण: https://stackblitz.com/edit/angular-pdw7kw

 defer(() => {
      let count = 0;
      return stream$.pipe(
        windowToggle(on$, () => off$),
        exhaustMap(obs => obs),
        mergeMap(_ => {
          if ((++count) % 5 === 0) {
            this.active$.next(false)
            return never()
          }
          return of(count)
        }),
      )
    }).subscribe(console.log)

मेरे पास एक एकल बटन के साथ एक सरल घटक है जो आरएक्सजेएस टाइमर द्वारा उत्पन्न संख्याओं की धारा को शुरू और रोक देता है।

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}

यह पूरी तरह से काम करता है।

मुझे एक और फीचर जोड़ने की जरूरत है। मुझे स्ट्रीम में मान के आधार पर स्वचालित रूप से स्थिति का उपयोग करके स्ट्रीम ( this.active$.next(false) ) को रोकने की आवश्यकता है। उदाहरण के लिए, स्ट्रीम को रोकें यदि नवीनतम मान 5 से अधिक था।

क्या आपके पास इस बारे में कोई विचार है कि यह कैसे करना है?

यहाँ स्टैकब्लिट्ज़ https://stackblitz.com/edit/angular-6hjznn पर एक रननीय उदाहरण दिया गया है


मैं यह मान रहा हूं कि वांछित व्यवहार उन मूल्यों को प्राप्त करने से संबंधित नहीं है जो टाइमर प्रति से बाहर निकलता है, और यह कि चल रहे नोटिफिकेशन को जारी स्ट्रीम में रखने के बजाय (आपके उदाहरण में, टाइमर तब भी जारी रहता है, जब हम मानों को नहीं देखते हैं मुद्रित), यह वास्तव में रुक जाता है जब रुक जाता है।

मेरा समाधान स्टॉपवॉच नुस्खा से प्रेरित है

नीचे दिए गए समाधान खेलने और ठहराव के लिए दो अलग-अलग बटन का उपयोग करता है, लेकिन आप इसे स्वाद के लिए समायोजित कर सकते हैं। हम घटक के ngAfterViewInit हुक में सेवा के लिए (ViewChild) बटन पास करते हैं, फिर हम स्ट्रीम की सदस्यता लेते हैं।

// pausable.component.ts
  ngAfterViewInit() {
    this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement);

    this.pausableService.counter$
      .pipe(takeUntil(this.unsubscribe$)) // don't forget to unsubscribe :)
      .subscribe((state: State) => {
        console.log(state.value); // whatever you need
    });
  }
// pausable.service.ts
import { Injectable } from '@angular/core';

import { merge, fromEvent, Subject, interval, NEVER } from 'rxjs';
import { mapTo, startWith, scan, switchMap, tap, map } from 'rxjs/operators';

export interface State {
  active: boolean;
  value: number;
}

@Injectable({
  providedIn: 'root'
})
export class PausableService {

  public counter$;

  constructor() { }

  initPausableStream(start: HTMLElement, pause: HTMLElement) {

    // convenience functions to map an element click to a result
    const fromClick = (el: HTMLElement) => fromEvent(el, 'click');
    const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj));

    const pauseByCondition$ = new Subject();
    const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0;

    // define the events that may trigger a change
    const events$ = merge(
      clickMapTo(start, { active: true }),
      clickMapTo(pause, { active: false }),
      pauseByCondition$.pipe(mapTo({ active: false }))
    );

    // switch the counter stream based on events
    this.counter$ = events$.pipe(
      startWith({ active: true, value: 0 }),
      scan((state: State, curr) => ({ ...state, ...curr }), {}),
      switchMap((state: State) => state.active
        ? interval(500).pipe(
          tap(_ => ++state.value),
          map(_ => state))
        : NEVER),
      tap((state: State) => {
        if (pauseCondition(state)) {
          pauseByCondition$.next(); // trigger pause
        }
      })
    );
  }

}

यहाँ यह करने के लिए एक सरल तरीका है। timer() उपयोग करें timer() केवल एक एमिटर के रूप में, और एक गणना को अलग से बढ़ाएं। यह आपको थोड़ा और प्रत्यक्ष नियंत्रण देता है।

export class AppComponent implements OnInit {
  active = true;
  out$: Observable<number>;

  count = 0;

  ngOnInit(): void {

    const stream$ = timer(500, 500);

    this.out$ = stream$.pipe(
      filter(v => this.active),
      map(v => {
        this.count += 1;
        return this.count;
      }),
      tap(v => {
        if (this.count % 5 === 0) {
          this.active = false;
        }
      })
    )
  }

}

https://stackblitz.com/edit/angular-nzs7zh


यहां एक कस्टम पॉज़ ऑपरेटर है जो पॉज़ सिग्नल के true होने पर बस एक बफर में मान जमा करेगा, और जब यह false हो तो एक-एक करके उनका उत्सर्जन करें।

जब मान किसी विशिष्ट स्थिति से टकराता है, तो व्यवहार विषय पॉज़ सिग्नल को टॉगल करने के लिए एक साधारण tap ऑपरेटर के साथ इसे मिलाएं, और आपके पास बटन पर क्लिक करने पर कुछ रुक जाएगा और जब मान एक शर्त से मिलता है (इस मामले में 12 से अधिक):

यहाँ pause ऑपरेटर है:

function pause<T>(pauseSignal: Observable<boolean>) {
  return (source: Observable<T>) => Observable.create(observer => {
    const buffer = [];
    let paused = false;
    let error;
    let isComplete = false;

    function notify() {
      while (!paused && buffer.length) {
        const value = buffer.shift();
        observer.next(value);
      }

      if (!buffer.length && error) {
        observer.error(error);
      }

      if (!buffer.length && isComplete) {
        observer.complete();
      }
    }

    const subscription = pauseSignal.subscribe(
      p => {
        paused = !p;
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {});

    subscription.add(source.subscribe(
      v => {
        buffer.push(v);
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {
        isComplete = true;
        notify();
      }
    ));

    return subscription;
  });
}

यहाँ इसका उपयोग है:

const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
  pause(this.active$),
  tap(value => {
    if (CONDITION(value)) {
      this.active$.next(false);
    }
  }));

this.d = out$.subscribe(v => console.log(v));

और एक कामकाजी उदाहरण: https://stackblitz.com/edit/angular-bvxnbf







rxjs