[Java] 你能将一个流分成两个流吗?


Answers

收集器可以用于此。

  • 对于两个类别,请使用Collectors.partitioningBy()工厂。

这将创建一个从BooleanListMap ,并根据Predicate将项目放入其中一个或另一个列表中。

注意:由于流需要被全部使用,所以这无法在无限流上工作。 因为无论如何都会使用流,所以此方法只是将它们放入列表中,而不是创建具有内存的新流。

此外,不需要迭代器,即使在您提供的仅头部示例中也不需要。

Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • 有关更多类别,请使用Collectors.groupingBy()工厂。
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是Stream ,而是像IntStream这样的基本流IntStream ,则此.collect(Collectors)方法不可用。 您必须在没有收集器工厂的情况下以手动方式进行。 它的实现看起来像这样:

IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000);

Predicate<Integer> p = x -> r.nextBoolean();
Map<Boolean, List<Integer>> groups = intStream.collect(() -> {
    Map<Boolean, List<Integer>> map = new HashMap<>();
    map.put(false, new ArrayList<>());
    map.put(true, new ArrayList<>());
    return map;
}, (map, x) -> {
    boolean partition = p.test(x);
    List<Integer> list = map.get(partition);
    list.add(x);
}, (map1, map2) -> {
    map1.get(false).addAll(map2.get(false));
    map1.get(true).addAll(map2.get(true));
});

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());

编辑

正如指出的那样,上述“解决方法”不是线程安全的。 在收集之前转换为正常的Stream是一种方式:

Stream<Integer> stream = intStream.boxed();
Question

我有一个由Java 8流表示的数据集:

Stream<T> stream = ...;

例如,我可以看到如何过滤它以获得一个随机子集

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我还可以看到如何减少这个数据流以获得,例如,两个列表代表数据集的两个随机半数,然后将这些数据集转换回流。 但是,是否有直接的方法从最初的一个生成两个流? 就像是

(heads, tails) = stream.[some kind of split based on filter]

感谢您的任何见解。




这违背了Stream的一般机制。 假设你可以将Stream S0分解为Sa和Sb,就像你想的那样。 在Sa上执行任何终端操作(例如count()将必然“消耗”S0中的所有元素。 因此Sb丢失了它的数据源。

以前,Stream有一个tee()方法,我认为它将一个流复制到两个。 它现在被删除。

尽管Stream有一个peek()方法,但你可以使用它来实现你的需求。




不完全是,但您可能能够通过调用Collectors.groupingBy()来完成所需的任务。 您创建一个新的集合,然后可以在该新集合上实例化流。




我偶然发现了这个问题,同时寻找一种方法来过滤流中的某些元素并将它们记录为错误。 所以我并没有真正需要对流进行分割,而是用一种不显眼的语法将一个过早的终止动作附加到一个谓词上。 这就是我想到的:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}