多线程并行 - c#多线程循环




Parallel.ForEach是否限制活动线程的数量? (4)

不,它不会启动1000个线程 - 是的,它会限制使用多少个线程。 并行扩展使用适当数量的核心,这取决于您实际拥有多少核心以及多少核心已经繁忙。 它为每个内核分配工作,然后使用称为“ 工作窃取”的技术,让每个线程都有效地处理自己的队列,并且只需要在需要时执行任何昂贵的跨线程访问。

查看PFX团队博客 ,了解关于如何分配工作和各种其他主题的大量信息。

请注意,在某些情况下,您也可以指定所需的并行度。

鉴于此代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

所有1000个线程几乎同时产生?


在单个核心机器上... Parallel.ForEach分区(块)是它在多个线程之间工作的集合,但是该数字是基于算法计算的,该算法考虑到并似乎持续监视由线程将分配给ForEach。 因此, 如果ForEach的主体部分调用长时间运行的IO绑定/阻塞函数,这会使线程等待,算法会产生更多的线程并重新分配它们之间的集合 。 如果线程快速完成并且不会在IO线程上阻塞,例如简单地计算一些数字, 则算法会将线程数提高(或确实下降)到算法认为吞吐量最佳的点(平均完成每次迭代的时间)

基本上,所有各种并行库函数的后面的线程池都可以找出最佳线程数量来使用。 物理处理器内核的数量仅构成等式的一部分。 核心数量和产生的线程数量之间并不存在简单的一对一关系。

我没有找到关于取消和处理同步线程的文档,这非常有帮助。 希望MS可以在MSDN中提供更好的示例。

不要忘记,正文代码必须写成在多个线程上运行,以及所有常见的线程安全考虑,框架不会抽象出这个因素。


很好的问题。 在你的例子中,即使在四核处理器上,并行化水平也非常低,但是有一些等待并行化的水平可能会相当高。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

现在看看添加等待操作来模拟HTTP请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我还没有做出任何改变,并发/并行化水平急剧上升。 ParallelOptions.MaxDegreeOfParallelism可以提高ParallelOptions.MaxDegreeOfParallelism

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我建议设置ParallelOptions.MaxDegreeOfParallelism 。 它不一定会增加使用的线程数量,但它会确保您只启动一个合理的线程数量,这似乎是您的担心。

最后回答你的问题,不,你不会立即开始所有的线程。 如果您想要完美地并行调用例如测试竞赛条件,请使用Parallel.Invoke。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}







parallel-processing