[C#] تنتظر التعشيش في Parallel.ForEach


Answers

جواب svick هو (كالعادة) ممتازة.

ومع ذلك ، أجد أن تدفق البيانات أكثر فائدة عندما يكون لديك بالفعل كميات كبيرة من البيانات لنقلها. أو عندما تحتاج إلى قائمة انتظار متوافقة غير متزامنة.

في حالتك ، فإن الحل الأبسط هو مجرد استخدام التوازي غير async :

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Question

في تطبيق مترو ، أحتاج إلى تنفيذ عدد من مكالمات WCF. هناك عدد كبير من المكالمات التي يجب إجراؤها ، لذلك أحتاج إلى تنفيذها في حلقة متوازية. تكمن المشكلة في إنهاء الحلقة المتوازية قبل اكتمال مكالمات WCF.

كيف يمكنك ريفاكتور هذا للعمل كما هو متوقع؟

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();



يمكنك توفير الجهد مع حزمة NuGet الجديدة AsyncEnumerator ، التي لم تكن موجودة قبل 4 سنوات عندما تم نشر السؤال في الأصل. يسمح لك بالتحكم في درجة التوازي:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

إخلاء المسؤولية: أنا مؤلف مكتبة AsyncEnumerator ، وهي مفتوحة المصدر ومرخصة تحت MIT ، وأنا نشر هذه الرسالة فقط لمساعدة المجتمع.




أنا متأخر قليلاً للحفل ولكن قد ترغب في استخدام GetAwaiter.GetResult () لتشغيل التعليمات البرمجية المتزامنة في سياق المزامنة ولكن كما paralled على النحو التالي؛

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});



هذا يجب أن يكون فعالاً للغاية ، وأسهل من الحصول على بيانات تدفق بيانات TPL بالكامل:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}