c# - जंजीर कार्य से अवलोकन




async-await system.reactive (2)

मैं एक निष्कर्ष बनाने की कोशिश कर रहा हूं जहां प्रत्येक आइटम को एक अतुल्यकालिक कार्य के माध्यम से बनाया गया है। अगले आइटम का पिछला आइटम (सह-रिकर्सन) के परिणाम पर एक async कॉल के माध्यम से उत्पादित किया जाना चाहिए। "जेनरेट करें" भाषा में यह ऐसा कुछ दिखाई देगा - सिवाय इसके कि जेनरेट एसिंक का समर्थन नहीं करता (न ही यह प्रारंभिक स्थिति पर प्रतिनिधि का समर्थन करता है।

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

एक और ठोस उदाहरण के रूप में, एक बार में 100 संदेश प्रेषित करके सर्विसबस कतार से सभी संदेशों को देखने के लिए, प्रोड्यूस फर्स्ट, जारी रखें और प्रोड्यूसनैज को निम्नानुसार कार्यान्वित करें:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

और फिर कॉल करें। .SelectMany(i => i) IObservable<IEnumerable<BrokeredMessage>> इसे IObservable<BrokeredMessage> में चालू करने के लिए IObservable<BrokeredMessage>

जहां _serviceBusReceiver निम्नानुसार एक इंटरफ़ेस का एक उदाहरण है:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

और ब्रोकरर्ड संदेश https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx से है


अच्छा परीक्षण करने के बाद मुझे लगता है कि यह काम अच्छी तरह से निर्मित आरएक्स ऑपरेटरों का उपयोग कर रहा है।

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

मैंने निम्न के साथ इस कोड का परीक्षण किया है:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

और यह क्रम चल रहा है:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

मुझे यह परिणाम मिला:

मेरे परीक्षण कोड ने रिएक्टिव एक्स्टेंशन टीम के इंटरएक्टिव एक्सटेंशंस - न्यूगेट "आईक्स-मेन" का भी उपयोग किया था।


यदि आप अपने स्वयं के async Generate फ़ंक्शन को रोल करने जा रहे हैं, तो मैं समय की लूप लपेटने के बजाय रिकर्सिव शेड्यूलिंग के उपयोग की सिफारिश करता हूं।

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

इसमें कुछ लाभ हैं सबसे पहले, आप इसे रद्द करने में सक्षम होते हैं, जबकि एक सरल लूप के साथ इसे सीधे रद्द करने का कोई रास्ता नहीं है, वास्तव में जब तक कि जांच योग्य पूरा न हो तब तक आप सदस्यता फ़ंक्शन के लिए भी वापस नहीं लौटेंगे। दूसरा, यह आपको प्रत्येक आइटम के शेड्यूलिंग / असिंक्रनाइज़ को नियंत्रित करने देता है (जो एक हवा का परीक्षण करता है), यह पुस्तकालय के लिए एक बेहतर समग्र फिट बनाता है





corecursion