c# - थ्रॉटलिंग एसिंक्रोनस कार्य




async-await semaphore (2)

मैं किसी भी समय किसी भी समय पूरा करने के लिए कितने कार्यों को लंबित कर सकता हूं इस पर एक सीमा के साथ, async कार्यों का एक समूह चलाने के लिए चाहते हैं।

मान लें कि आपके पास 1000 यूआरएल हैं, और आप केवल एक ही समय में 50 अनुरोध खोलना चाहते हैं; लेकिन जैसे ही एक अनुरोध पूरा हो जाता है, आप सूची में अगले यूआरएल के साथ एक कनेक्शन खोलते हैं। इस तरह, यूआरएल सूची समाप्त हो जाने तक, एक समय में हमेशा 50 कनेक्शन खुले रहते हैं।

यदि संभव हो तो मैं भी दिए गए धागे का उपयोग करना चाहता हूं।

मैं एक विस्तार विधि के साथ आया, ThrottleTasksAsync जो मैं चाहता हूं। क्या वहां पहले से ही एक आसान समाधान है? मुझे लगता है कि यह एक आम परिदृश्य है।

उपयोग:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

यहां कोड है:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

विधि इसे काम करने के लिए BlockingCollection और SemaphoreSlim का उपयोग करती है। थ्रॉटलर एक थ्रेड पर चलाया जाता है, और सभी एसिंक कार्यों को अन्य धागे पर चलाया जाता है। समानांतरता प्राप्त करने के लिए, मैंने एक maxDegreeOfParallelism पैरामीटर जोड़ा जो Parallel.ForEach को पारित किया गया है। फॉरएच लूप को while रूप में फिर से बनाया गया है।

पुराना संस्करण था:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

लेकिन, थ्रेड पूल तेजी से समाप्त हो जाता है, और आप async / await नहीं कर सकते हैं।

बोनस: BlockingCollection TryTake में समस्या को हल करने के लिए जहां Take() में एक अपवाद फेंक दिया जाता है जब CompleteAdding() कहा जाता है, मैं टाइमआउट के साथ TryTake अधिभार का उपयोग कर रहा हूं। अगर मैंने TryTake में टाइमआउट का उपयोग नहीं किया है, तो यह BlockingCollection TryTake का उपयोग करने के उद्देश्य को TryTake देगा क्योंकि TryTake ब्लॉक नहीं करेगा। क्या कोई बेहतर तरीका है? आदर्श रूप में, एक TakeAsync विधि होगी।


मान लें कि आपके पास 1000 यूआरएल हैं, और आप केवल एक ही समय में 50 अनुरोध खोलना चाहते हैं; लेकिन जैसे ही एक अनुरोध पूरा हो जाता है, आप सूची में अगले यूआरएल के साथ एक कनेक्शन खोलते हैं। इस तरह, यूआरएल सूची समाप्त हो जाने तक, एक समय में हमेशा 50 कनेक्शन खुले रहते हैं।

एसओ पर निम्नलिखित सरल समाधान कई बार सामने आया है। यह अवरुद्ध कोड का उपयोग नहीं करता है और धागे को स्पष्ट रूप से नहीं बनाता है, इसलिए यह बहुत अच्छी तरह से स्केल करता है:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

बात यह है कि, डाउनलोड किए गए डेटा की प्रसंस्करण एक अलग पाइपलाइन पर समानांतरता के एक अलग स्तर के साथ किया जाना चाहिए, खासकर यदि यह एक सीपीयू-बाध्य प्रसंस्करण है।

उदाहरण के लिए, आप शायद 4 थ्रेड को डेटा प्रसंस्करण (सीपीयू कोर की संख्या) कर रहे हैं, और अधिक डेटा के लिए 50 लंबित अनुरोध (जो थ्रेड का उपयोग नहीं करते हैं) करना चाहते हैं। AFAICT, यह नहीं है कि आपका कोड वर्तमान में क्या कर रहा है।

यही वह जगह है जहां टीपीएल डेटाफ्लो या आरएक्स एक पसंदीदा समाधान के रूप में काम में आ सकता है। फिर भी सादे टीपीएल के साथ ऐसा कुछ लागू करना निश्चित रूप से संभव है। नोट, यहां केवल एकमात्र अवरुद्ध कोड है जो कार्य के अंदर वास्तविक डेटा प्रोसेसिंग कर रहा है। Task.Run :

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}

अनुरोध के रूप में, यहां कोड है जिसके साथ मैं समाप्त हुआ।

यह कार्य मास्टर-विस्तार कॉन्फ़िगरेशन में स्थापित है, और प्रत्येक मास्टर को बैच के रूप में संसाधित किया जाता है। काम की प्रत्येक इकाई इस फैशन में कतारबद्ध है:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

अन्य बाहरी प्रक्रियाओं के लिए काम बचाने के लिए परास्नातक एक समय में बफर किए जाते हैं। प्रत्येक मास्टर के लिए विवरण मास्टर TransformManyBlock माध्यम से काम के लिए प्रेषित किए जाते हैं। एक बैच में विवरण एकत्र करने के लिए BatchedJoinBlock भी बनाया गया है।

वास्तविक कार्य विस्तार में किया जाता है TransformBlock , detailTransform रूप से, 150 एक समय में। BoundedCapacity को यह सुनिश्चित करने के लिए 300 पर सेट किया गया है कि श्रृंखला के आरंभ में बहुत से मास्टर्स को बफर नहीं किया जाता है, जबकि एक बार में 150 रिकॉर्ड संसाधित होने की अनुमति देने के लिए पर्याप्त विवरण रिकॉर्ड्स के लिए जगह छोड़ दी जाती है। ब्लॉक किसी object को अपने लक्ष्यों को आउटपुट करता है, क्योंकि यह एक Detail या Exception के आधार पर लिंक पर फ़िल्टर किया जाता है।

batchAction ActionBlock सभी बैचों से आउटपुट एकत्र करता है, और प्रत्येक बैच के लिए थोक डेटाबेस अपडेट, त्रुटि लॉगिंग इत्यादि करता है।

प्रत्येक मास्टर के लिए एक, कई BatchedJoinBlock एस होगा। चूंकि प्रत्येक ISourceBlock आउटपुट होता है और प्रत्येक बैच केवल एक मास्टर से जुड़े विस्तार रिकॉर्ड की संख्या स्वीकार करता है, बैचों को क्रमशः संसाधित किया जाएगा। प्रत्येक ब्लॉक केवल एक समूह को आउटपुट करता है, और इसे पूरा होने पर अनलिंक किया जाता है। केवल अंतिम बैच ब्लॉक अंतिम ActionBlock पूरा होने का प्रचार करता है।

डेटाफ्लो नेटवर्क:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });






tpl-dataflow