c# - without - Comment puis-je empêcher les suites synchrones sur une tâche?




task run c# (4)

Je ne pense pas qu'il y ait quoi que ce soit dans TPL qui fournirait un contrôle d'API explicite sur les TaskCompletionSource.SetResult . J'ai décidé de garder ma réponse initiale pour contrôler ce comportement pour les scénarios async/await .

Voici une autre solution qui impose asynchrone sur ContinueWith , si la continuation tcs.SetResult -triggered a lieu sur le même thread que le SetResult été appelé:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Mis à jour pour répondre au commentaire:

Je ne contrôle pas l'appelant - je ne peux pas l'amener à utiliser une variante spécifique continue: si je pouvais, le problème n'existerait pas en premier lieu

Je ne savais pas que vous ne contrôliez pas l'appelant. Néanmoins, si vous ne le contrôlez pas, vous ne passez probablement pas l'objet TaskCompletionSource directement à l'appelant. Logiquement, vous tcs.Task la partie symbolique , c'est-à-dire tcs.Task . Dans ce cas, la solution pourrait être encore plus facile, en ajoutant une autre méthode d'extension à ce qui précède:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Utilisation:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Cela fonctionne réellement à la fois pour await et ContinueWith ( fiddle ) et est exempt de hacks de réflexion.

J'ai un code de bibliothèque (réseau de socket) qui fournit une API basée sur les Task pour les réponses en attente aux demandes, basée sur TaskCompletionSource<T> . Cependant, il y a un ennui dans le TPL en ce sens qu'il semble impossible d'empêcher les continuations synchrones. Ce que j'aimerais pouvoir faire est soit:

  • indiquer une TaskCompletionSource<T> qui ne doit pas autoriser les appelants à se joindre à TaskContinuationOptions.ExecuteSynchronously , ou
  • définir le résultat ( SetResult / TrySetResult ) d'une manière qui spécifie que TaskContinuationOptions.ExecuteSynchronously doit être ignoré, en utilisant le pool à la place

Plus précisément, le problème que j'ai est que les données entrantes sont traitées par un lecteur dédié, et si un appelant peut se joindre à TaskContinuationOptions.ExecuteSynchronously ils peuvent bloquer le lecteur (ce qui affecte plus que seulement eux). Auparavant, j'ai travaillé autour de cela par un hackery qui détecte si des suites sont présentes, et si elles le sont, il pousse l'achèvement sur le ThreadPool , mais cela a un impact significatif si l'appelant a saturé leur file d'attente, traité en temps opportun. S'ils utilisent Task.Wait() (ou similaire), ils se Task.Wait() alors eux-mêmes. De même, c'est pourquoi le lecteur est sur un fil dédié plutôt que d'utiliser des travailleurs.

Alors; avant d'essayer de harceler l'équipe TPL: est-ce que je rate une option?

Points clés:

  • Je ne veux pas que les appelants externes puissent détourner mon fil
  • Je ne peux pas utiliser le ThreadPool tant ThreadPool , car il doit fonctionner lorsque le pool est saturé

L'exemple ci-dessous produit une sortie (l'ordre peut varier en fonction du timing):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Le problème est le fait qu'un appelant aléatoire a réussi à obtenir une continuation sur "Main thread". Dans le code réel, cela interromprait le lecteur principal; mauvaises choses!

Code:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

L'approche simuler l'abandon semblait très bonne, mais a conduit à des détournements de la TPL dans certains scénarios .

J'ai ensuite eu une implémentation similaire à la vérification de l'objet de continuation , mais en vérifiant simplement s'il y avait trop de scénarios pour que le code donné fonctionne bien, mais cela signifiait que même des Task.Wait telles que Task.Wait aboutissaient à un thread. recherche de piscine.

En fin de compte, après avoir inspecté beaucoup, beaucoup de IL, le seul scénario sûr et utile est le scénario SetOnInvokeMres (manuel-reset-event-slim continuation). Il y a beaucoup d'autres scénarios:

  • certains ne sont pas sûrs, et conduisent à un détournement de fil
  • le reste ne sont pas utiles, car ils conduisent finalement à la piscine de threads

Donc, à la fin, j'ai choisi de vérifier un continuation-object non nul; si c'est nul, bien (pas de continuations); si elle est non nulle, vérifiez si SetOnInvokeMres cas SetOnInvokeMres : si c'est bien: SetOnInvokeMres (invoqué en toute sécurité); sinon, laissez le pool de threads exécuter le TrySetComplete , sans indiquer à la tâche de faire quoi que ce soit de spécial, comme l'usurpation d'identité. Task.Wait utilise l'approche SetOnInvokeMres , qui est le scénario spécifique que nous voulons vraiment essayer de ne pas bloquer.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

si vous pouvez et êtes prêt à utiliser la réflexion, cela devrait le faire;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

Mis à jour , j'ai posté une réponse séparée pour faire face à ContinueWith par opposition à await (parce que ContinueWith ne se soucie pas du contexte de synchronisation en cours).

Vous pouvez utiliser un contexte de synchronisation bête pour imposer l'asynchronie lors de la continuation déclenchée en appelant SetResult/SetCancelled/SetException sur TaskCompletionSource . Je crois que le contexte de synchronisation actuel (au moment d' await tcs.Task ) est le critère utilisé par TPL pour décider si une telle continuation est synchrone ou asynchrone.

Les travaux suivants pour moi:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync est implémenté comme ceci:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext est très bon marché en termes de surcharge ajoutée. En fait, une approche très similaire est prise par l' implémentation de WPF Dispatcher.BeginInvoke .

TPL compare le contexte de synchronisation cible au point d' await à celui du point de tcs.SetResult . Si le contexte de synchronisation est le même (ou s'il n'y a pas de contexte de synchronisation aux deux endroits), la continuation est appelée directement, de manière synchrone. Sinon, il est mis en file d'attente à l'aide de SynchronizationContext.Post sur le contexte de synchronisation cible, c'est-à-dire le comportement d' await normal. Cette approche impose toujours le comportement SynchronizationContext.Post (ou une continuation de thread de pool s'il n'y a pas de contexte de synchronisation cible).

Mis à jour , cela ne fonctionnera pas pour task.ContinueWith , parce que ContinueWith ne se soucie pas du contexte de synchronisation en cours. Il travaille cependant pour await task ( fiddle ). Il fonctionne également pour await task.ConfigureAwait(false) .

OTOH, cette approche fonctionne pour ContinueWith .





async-await