[c#] تنتظر التعشيش في Parallel.ForEach



3 Answers

جواب svick هو (كالعادة) ممتازة.

ومع ذلك ، أجد أن تدفق البيانات أكثر فائدة عندما يكون لديك بالفعل كميات كبيرة من البيانات لنقلها. أو عندما تحتاج إلى قائمة انتظار متوافقة غير متزامنة.

في حالتك ، فإن الحل الأبسط هو مجرد استخدام التوازي غير async :

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Question

في تطبيق مترو ، أحتاج إلى تنفيذ عدد من مكالمات WCF. هناك عدد كبير من المكالمات التي يجب إجراؤها ، لذلك أحتاج إلى تنفيذها في حلقة متوازية. تكمن المشكلة في إنهاء الحلقة المتوازية قبل اكتمال مكالمات WCF.

كيف يمكنك ريفاكتور هذا للعمل كما هو متوقع؟

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();



يمكنك توفير الجهد مع حزمة NuGet الجديدة AsyncEnumerator ، التي لم تكن موجودة قبل 4 سنوات عندما تم نشر السؤال في الأصل. يسمح لك بالتحكم في درجة التوازي:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

إخلاء المسؤولية: أنا مؤلف مكتبة AsyncEnumerator ، وهي مفتوحة المصدر ومرخصة تحت MIT ، وأنا نشر هذه الرسالة فقط لمساعدة المجتمع.




هذا يجب أن يكون فعالاً للغاية ، وأسهل من الحصول على بيانات تدفق بيانات TPL بالكامل:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}



أنا متأخر قليلاً للحفل ولكن قد ترغب في استخدام GetAwaiter.GetResult () لتشغيل التعليمات البرمجية المتزامنة في سياق المزامنة ولكن كما paralled على النحو التالي؛

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});



Related