[c#] Nesting te espera en Parallel.ForEach


Answers

La respuesta de svick es (como de costumbre) excelente.

Sin embargo, considero que Dataflow es más útil cuando realmente tiene grandes cantidades de datos para transferir. O cuando necesita una cola compatible con async .

En su caso, una solución más simple es simplemente usar el paralelismo estilo async :

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
Question

En una aplicación de metro, necesito ejecutar varias llamadas WCF. Hay una cantidad importante de llamadas que hacer, por lo que necesito hacerlas en un bucle paralelo. El problema es que el ciclo paralelo sale antes de que las llamadas WCF estén completas.

¿Cómo podría refactorizar esto para que funcione como se espera?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();



Llego un poco tarde a la fiesta, pero es posible que desee considerar el uso de GetAwaiter.GetResult () para ejecutar su código asíncrono en el contexto de sincronización, pero como se describe a continuación;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});



Esto debería ser bastante eficiente y más fácil que hacer funcionar todo el TPL Dataflow:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}



Puede ahorrar esfuerzo con el nuevo paquete AsyncEnumerator NuGet , que no existía hace 4 años cuando la pregunta se publicó originalmente. Le permite controlar el grado de paralelismo:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, que es de código abierto y con licencia bajo MIT, y estoy publicando este mensaje solo para ayudar a la comunidad.




Links