c# - concurrent list java




.NET 4.0中没有ConcurrentList<T>? (7)

我很高兴在.NET 4.0中看到新的System.Collections.Concurrent命名空间,相当不错! 我见过ConcurrentDictionaryConcurrentQueueConcurrentStackConcurrentBagBlockingCollection

似乎神秘缺失的一件事是ConcurrentList<T> 。 我必须自己写(或从网上下载:))?

我在这里错过了很明显的东西吗


ConcurrentList (作为可调整大小的数组,不是链接列表)不容易使用非阻塞操作进行编写。 其API不能很好地转换为“并发”版本。


System.Collections.Generic.List<t>对多个读者来说已经是线程安全的。 试图使它对多个作者是安全的,这是没有意义的。 (由于Henk和Stephen已经提到的原因)


在顺序执行代码时,所使用的数据结构不同于(良好编写的)并行执行的代码。 原因是顺序代码隐含着顺序。 然而,并发代码并不意味着任何顺序; 更好但它意味着缺乏任何明确的顺序!

由于这个原因,隐含顺序的数据结构(如列表)对于解决并发问题并不是很有用。 列表意味着顺序,但它没有明确定义该顺序是什么。 因此,操作列表的代码的执行顺序将(在一定程度上)确定列表的隐式顺序,这与有效的并行解决方案直接冲突。

记住并发是数据问题,而不是代码问题! 您无法首先实现代码(或重写现有的顺序代码)并获得设计良好的并行解决方案。 您需要首先设计数据结构,同时牢记并发系统中不存在隐式排序。


如果读取数量大大超过写入数量,或者(无论频繁)写入是否是非并行的 ,则写入时复制方法可能是适当的。

下面显示的实现是

  • 无锁
  • 并发读取速度非常快 ,即使正在进行并发修改 - 无论它们需要多长时间
  • 因为“快照”是不可变的, 无锁原子性是可能的,即var snap = _list; snap[snap.Count - 1]; var snap = _list; snap[snap.Count - 1]; 将永远不会(当然,除了一个空的列表)抛出,并且你还可以免费获得快照语义的线程安全枚举..我是如何爱不变的!
  • 通用实施 ,适用于任何数据结构任何类型的修改
  • 死简单 ,即易于测试,调试,通过阅读代码进行验证
  • 可在.Net 3.5中使用

要使写入时复制起作用,必须使数据结构有效地保持不变 ,即在让其他线程可用后,不允许更改它们。 当你想修改时,你

  1. 克隆结构
  2. 对克隆进行修改
  3. 原子交换参照修改克隆

static class CopyOnWriteSwapper
{
    public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
        where T : class
    {
        while (true)
        {
            var objBefore = Volatile.Read(ref obj);
            var newObj = cloner(objBefore);
            op(newObj);
            if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
                return;
        }
    }
}

用法

CopyOnWriteSwapper.Swap(ref _myList,
    orig => new List<string>(orig),
    clone => clone.Add("asdf"));

如果你需要更多的性能,这将有助于ungenerify方法,例如为每种类型的修改(Add,Remove,...)创建一个方法,并且硬编码函数指针clonerop

NB#1您有责任确保没有人修改(据称)不可变的数据结构。 我们没有办法在通用的实现中做到这一点,但是当专注于List<T> ,您可以使用List.AsReadOnly()来防止修改。

注意#2小心列表中的值。 上面的写入复制方法仅保护其列表成员资格,但如果您不放置字符串,但其中还有其他可变对象,则必须注意线程安全性(例如锁定)。 但是这与这个解决方案是正交的,例如,可以毫无问题地容易地使用可变值的锁定。 你只需要知道它。

注意#3如果您的数据结构非常庞大并且经常修改它,则无论是在内存消耗还是所涉及的复制CPU成本方面,全副本复制方法都可能会令人望而却步。 在这种情况下,您可能想使用MS的不可变集合


试了一会儿 (也是: 在GitHub上 )。 我的实现有一些问题,我不会在这里介绍。 让我告诉你,更重要的是,我学到了什么。

首先,你不可能完全实现无锁和线程安全的IList<T> 。 特别是,随机插入和删除不会起作用,除非你也忘记了O(1)随机存取(即,除非你“欺骗”并且只是使用某种链接列表并让索引吸引)。

认为可能值得的是IList<T>一个线程安全的,有限的子集:特别是一个允许Add并通过索引提供随机只读访问的Insert (但不包含InsertRemoveAt等,还有没有随机访问)。

这是我的ConcurrentList<T>实现的目标。 但是当我在多线程场景中测试它的性能时,我发现简单地同步添加到List<T>更快 。 基本上,添加到List<T>已经很快闪电; 所涉及的计算步骤的复杂性很小(增加一个索引并将其分配给数组中的一个元素; 实际上就是这样 )。 你需要大量的并发写操作才能看到任何种类的锁争用; 即使如此,每次写入的平均性能仍然会在ConcurrentList<T>击败更昂贵但尽管无锁的实现。

在列表的内部阵列需要调整自身大小的相对罕见的事件中,您确实付出了很小的代价。 因此,最终我得出结论:这是一个小众场景,其中只有ConcurrentList<T>集合类型才有意义:当您希望保证每次调用时添加元素的开销较低(因此,与摊销的性能目标相反)。

这不像你想象的那样有用。


我实现了一个类似于Brian's 。 我的是不同的:

  • 我直接管理阵列。
  • 我不输入try块内的锁。
  • 我使用yield return来生成统计员。
  • 我支持锁定递归。 这允许在迭代期间从列表中读取。
  • 我尽可能使用可升级的读锁。
  • DoSyncGetSync方法允许需要独占访问列表的顺序交互。

代码

public class ConcurrentList<T> : IList<T>, IDisposable
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private int _count = 0;

    public int Count
    {
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _count;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    public int InternalArrayLength
    { 
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _arr.Length;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    private T[] _arr;

    public ConcurrentList(int initialCapacity)
    {
        _arr = new T[initialCapacity];
    }

    public ConcurrentList():this(4)
    { }

    public ConcurrentList(IEnumerable<T> items)
    {
        _arr = items.ToArray();
        _count = _arr.Length;
    }

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {       
            var newCount = _count + 1;          
            EnsureCapacity(newCount);           
            _arr[_count] = item;
            _count = newCount;                  
        }
        finally
        {
            _lock.ExitWriteLock();
        }       
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            throw new ArgumentNullException("items");

        _lock.EnterWriteLock();

        try
        {           
            var arr = items as T[] ?? items.ToArray();          
            var newCount = _count + arr.Length;
            EnsureCapacity(newCount);           
            Array.Copy(arr, 0, _arr, _count, arr.Length);       
            _count = newCount;
        }
        finally
        {
            _lock.ExitWriteLock();          
        }
    }

    private void EnsureCapacity(int capacity)
    {   
        if (_arr.Length >= capacity)
            return;

        int doubled;
        checked
        {
            try
            {           
                doubled = _arr.Length * 2;
            }
            catch (OverflowException)
            {
                doubled = int.MaxValue;
            }
        }

        var newLength = Math.Max(doubled, capacity);            
        Array.Resize(ref _arr, newLength);
    }

    public bool Remove(T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {           
            var i = IndexOfInternal(item);

            if (i == -1)
                return false;

            _lock.EnterWriteLock();
            try
            {   
                RemoveAtInternal(i);
                return true;
            }
            finally
            {               
                _lock.ExitWriteLock();
            }
        }
        finally
        {           
            _lock.ExitUpgradeableReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        _lock.EnterReadLock();

        try
        {    
            for (int i = 0; i < _count; i++)
                // deadlocking potential mitigated by lock recursion enforcement
                yield return _arr[i]; 
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public int IndexOf(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    private int IndexOfInternal(T item)
    {
        return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
    }

    public void Insert(int index, T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {                       
            if (index > _count)
                throw new ArgumentOutOfRangeException("index"); 

            _lock.EnterWriteLock();
            try
            {       
                var newCount = _count + 1;
                EnsureCapacity(newCount);

                // shift everything right by one, starting at index
                Array.Copy(_arr, index, _arr, index + 1, _count - index);

                // insert
                _arr[index] = item;     
                _count = newCount;
            }
            finally
            {           
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }


    }

    public void RemoveAt(int index)
    {   
        _lock.EnterUpgradeableReadLock();
        try
        {   
            if (index >= _count)
                throw new ArgumentOutOfRangeException("index");

            _lock.EnterWriteLock();
            try
            {           
                RemoveAtInternal(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }
    }

    private void RemoveAtInternal(int index)
    {           
        Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
        _count--;

        // release last element
        Array.Clear(_arr, _count, 1);
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {        
            Array.Clear(_arr, 0, _count);
            _count = 0;
        }
        finally
        {           
            _lock.ExitWriteLock();
        }   
    }

    public bool Contains(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item) != -1;
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {       
        _lock.EnterReadLock();
        try
        {           
            if(_count > array.Length - arrayIndex)
                throw new ArgumentException("Destination array was not long enough.");

            Array.Copy(_arr, 0, array, arrayIndex, _count);
        }
        finally
        {
            _lock.ExitReadLock();           
        }
    }

    public bool IsReadOnly
    {   
        get { return false; }
    }

    public T this[int index]
    {
        get
        {
            _lock.EnterReadLock();
            try
            {           
                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                return _arr[index]; 
            }
            finally
            {
                _lock.ExitReadLock();               
            }           
        }
        set
        {
            _lock.EnterUpgradeableReadLock();
            try
            {

                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                _lock.EnterWriteLock();
                try
                {                       
                    _arr[index] = value;
                }
                finally
                {
                    _lock.ExitWriteLock();              
                }
            }
            finally
            {
                _lock.ExitUpgradeableReadLock();
            }

        }
    }

    public void DoSync(Action<ConcurrentList<T>> action)
    {
        GetSync(l =>
        {
            action(l);
            return 0;
        });
    }

    public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
    {
        _lock.EnterWriteLock();
        try
        {           
            return func(this);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Dispose()
    {   
        _lock.Dispose();
    }
}

没有ConcurrentList的原因是因为它从根本上不能写。 之所以这样,是因为IList中的一些重要操作依赖于索引,而这种简单的操作不起作用。 例如:

int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");

作者追求的效果是在“猫”之前插入“狗”,但在多线程环境中,这两行代码之间的列表可能会发生任何变化。 例如,另一个线程可能会做list.RemoveAt(0) ,将整个列表向左移动,但关键的是, catIndex不会改变。 这里的影响是Insert操作实际上会把“狗”放在猫后面 ,而不是放在它之前。

您所看到的针对此问题提供的“答案”的几种实现方式是良好的,但如上所示,它们不提供可靠的结果。 如果你真的想在多线程环境中使用类列表语义,你不能通过在列表实现方法中加入锁达到目的。 您必须确保您使用的任何索引完全位于锁的上下文中。 结果是您可以在多线程环境中使用带有正确锁定的List,但是该列表本身无法在该世界中存在。

如果你认为你需要一个并发列表,那实际上只有两种可能性:

  1. 你真正需要的是一个ConcurrentBag
  2. 您需要创建自己的集合,可能使用List和您自己的并发控制来实现。

如果你有一个ConcurrentBag并且处于需要将它作为IList传递的位置,那么你有一个问题,因为你调用的方法已经指定他们可能会尝试像上面那样用cat&狗。 在大多数世界中,这意味着你所调用的方法根本不是为了在多线程环境中工作而构建的。 这意味着你要么重构它,要么就是,如果你不能,你必须非常小心地处理它。 你几乎肯定会被要求用自己的锁创建你自己的集合,并在锁内调用违规方法。





task-parallel-library