c# - Nesting te espera en Parallel.ForEach




wcf async-await task-parallel-library (9)

Llego un poco tarde a la fiesta, pero es posible que desee considerar el uso de GetAwaiter.GetResult () para ejecutar su código asíncrono en el contexto de sincronización, pero como se describe a continuación;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

En una aplicación de metro, necesito ejecutar varias llamadas WCF. Hay una cantidad importante de llamadas que hacer, por lo que necesito hacerlas en un bucle paralelo. El problema es que el ciclo paralelo sale antes de que las llamadas WCF estén completas.

¿Cómo podría refactorizar esto para que funcione como se espera?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

La idea detrás de Parallel.ForEach() es que tienes un conjunto de hilos y cada hilo procesa parte de la colección. Como habrás notado, esto no funciona con async - await , donde deseas liberar el hilo mientras dura la llamada asincrónica.

Podrías "arreglarlo" bloqueando los hilos ForEach() , pero eso derrota todo el punto de async - await .

Lo que podría hacer es usar TPL Dataflow en lugar de Parallel.ForEach() , que también admite Task asíncronas.

Específicamente, su código podría escribirse utilizando un TransformBlock que transforma cada identificación en un Customer utiliza el async lambda. Este bloque se puede configurar para ejecutarse en paralelo. Usted vincularía ese bloque a un ActionBlock que escribe cada Customer en la consola. Después de configurar la red de bloques, puede Post() cada ID en TransformBlock .

En codigo:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Aunque probablemente desee limitar el paralelismo de TransformBlock a una pequeña constante. Además, podría limitar la capacidad de TransformBlock y agregar los elementos a ella de forma asíncrona utilizando SendAsync() , por ejemplo, si la colección es demasiado grande.

Como un beneficio adicional en comparación con su código (si funcionó), la escritura comenzará tan pronto como se complete un solo elemento y no espere hasta que todo el procesamiento haya finalizado.


Después de presentar un grupo de métodos de ayuda, podrá ejecutar consultas paralelas con este simple sintax:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

Lo que ocurre aquí es que dividimos la recopilación de fuentes en 10 fragmentos ( .Split(DegreeOfParallelism) ), luego ejecutamos 10 tareas, cada una de las cuales procesa sus elementos uno por uno ( .SelectManyAsync(...) ) y los .SelectManyAsync(...) en una sola lista.

Vale la pena mencionar que hay un enfoque más simple:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

Pero necesita una precaución : si tiene una colección de fuentes que es demasiado grande, producirá una Task para cada elemento de inmediato, lo que puede ocasionar importantes resultados de rendimiento.

Los métodos de extensión utilizados en los ejemplos anteriores tienen el siguiente aspecto:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}

Esto debería ser bastante eficiente y más fácil que hacer funcionar todo el TPL Dataflow:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}

Puede ahorrar esfuerzo con el nuevo paquete AsyncEnumerator NuGet , que no existía hace 4 años cuando la pregunta se publicó originalmente. Le permite controlar el grado de paralelismo:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, que es de código abierto y con licencia bajo MIT, y estoy publicando este mensaje solo para ayudar a la comunidad.


La respuesta de svick es (como de costumbre) excelente.

Sin embargo, considero que Dataflow es más útil cuando realmente tiene grandes cantidades de datos para transferir. O cuando necesita una cola compatible con async .

En su caso, una solución más simple es simplemente usar el paralelismo estilo async :

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();

Usar DataFlow como se sugirió puede ser excesivo, y la respuesta de Stephen no proporciona los medios para controlar la concurrencia de la operación. Sin embargo, eso se puede lograr de manera bastante simple:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

Las llamadas a ToArray() se pueden optimizar utilizando una matriz en lugar de una lista y reemplazando las tareas completadas, pero dudo que ToArray() una gran diferencia en la mayoría de los escenarios. Uso de muestra según la pregunta del OP:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

El usuario de EDIT Fellow SO y el experto de TPL, Eli Arbel, me señalaron un artículo relacionado de Stephen Toub . Como de costumbre, su implementación es elegante y eficiente:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });

        })); 
}

Ajustar Parallel.Foreach en una Task.Run () y en lugar de la palabra clave await use [yourasyncmethod] .Result

(debe hacer la tarea "Ejecutar.Run" para no bloquear el hilo de la interfaz de usuario)

Algo como esto:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;

Tal vez el System.Reflection.Emit nombres System.Reflection.Emit puede ayudarte. MethodAttributes enumeración MethodAttributes contiene elementos similares (SpecialName, RTSpecialName) .





c# wcf async-await task-parallel-library parallel.foreach