java - completablefuture线程池




Java 8并行流中的自定义线程池 (6)

是否可以为Java 8 并行流指定自定义线程池? 我无法在任何地方找到它。

想象一下,我有一个服务器应用程序,我想使用并行流。 但是这个应用程序很大且多线程,所以我想划分它。 我不想在另一个模块的应用程序块任务的一个模块中执行运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数真实世界的情况下安全地使用并行流。

试试下面的例子。 有一些CPU密集型任务在不同的线程中执行。 这些任务利用并行流。 第一项任务被破坏,所以每一步都需要1秒(由线程休眠模拟)。 问题在于其他线程卡住了,等待中断的任务完成。 这是一个人为的例子,但想象一下servlet应用程序和某人向共享fork连接池提交长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}


到现在为止,我使用了这个问题的答案中描述的解决方案。 现在,我想出了一个名为Parallel Stream Support的小库,

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

但正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在很多缺点,这很大程度上取决于公共池的大小。 请参阅HashSet中的并行流不会并行运行

我正在使用此解决方案仅为不同类型的工作分开存储池,但即使不使用它,我也无法将公用池的大小设置为1。


如果您不介意使用第三方库,使用cyclops-react您可以在同一管道中混合顺序流和并行流,并提供定制的ForkJoinPools。 例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

或者如果我们希望继续在顺序流内处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[披露我是独眼巨人反应的首席开发者]


实际上有一个技巧是如何在特定的fork-join池中执行并行操作的。 如果您将其作为fork-join池中的任务执行,则它将停留在此处并且不会使用常见的任务。

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

技巧基于ForkJoinTask.fork ,它指定:“如果适用,安排异步执行当前任务运行的池中的任务,如果不是inForkJoinPool(),则使用ForkJoinPool.commonPool()”


要测量使用的线程的实际数量,可以检查Thread.activeCount()

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

没有.parallel()它给:

3 // common pool
4 // custom pool

除了在您自己的forkJoinPool中触发并行计算的技巧,您还可以将该池传递给CompletableFuture.supplyAsync方法,如下所示:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);




java-stream