关于jdk11新特性流操作的研究
以官方文档为参考, 见下图
使用图床上传的图片地址
有次操作是因为图中有较重要的信息, 使用最不损失画质的方法做参考
使用csdn的图片
关于流操作之中断操作的官方内容
流操作分为中间操作和终端操作,并组合成流管道。流管道由源(例如集合、数组、生成器函数或 I/O 通道)组成;后跟零个或多个中间操作,例如 Stream.filter 或 Stream.map;以及 Stream.forEach 或 Stream.reduce 等终端操作。
中间操作返回一个新流。他们总是懒惰;执行中间操作(如 filter())实际上不会执行任何筛选,而是创建一个新流,该流在遍历时包含与给定谓词匹配的初始流的元素。在执行管道的终端操作之前,不会开始遍历管道源。
终端操作(如 Stream.forEach 或 IntStream.sum)可能会遍历流以产生结果或副作用。执行终端操作后,流管道视为已消耗,不能再使用;如果需要再次遍历同一数据源,则必须返回到数据源以获取新流。在几乎所有情况下,终端操作都是急切的,在返回之前完成对数据源的遍历和管道的处理。只有终端操作 iterator() 和 spliterator() 不是;它们作为“逃生舱口”提供,以便在现有操作不足以完成任务时启用任意客户端控制的管道遍历。
延迟处理流可以显著提高效率;在管道(如上面的 filter-map-sum 示例)中,筛选、映射和求和可以融合到数据的单个传递中,具有最小的中间状态。懒惰还允许在不必要的情况下避免检查所有数据;对于诸如“查找长度超过 1000 个字符的第一个字符串”之类的操作,只需检查足够多的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。(当输入流是无限的,而不仅仅是大时,此行为变得更加重要。
中间操作进一步分为无状态操作和有状态操作。在处理新元素时,无状态操作(如 filter 和 map)不会保留以前看到的元素的状态 - 每个元素都可以独立于其他元素的操作进行处理。有状态操作(如 distinct 和 sorted)在处理新元素时可能会合并以前看到的元素的状态。
有状态操作可能需要在生成结果之前处理整个输入。例如,在看到流的所有元素之前,无法通过对流进行排序来产生任何结果。因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。包含独占无状态中间操作的管道可以在单次传递中处理,无论是顺序的还是并行的,只需最少的数据缓冲。
此外,某些操作被视为短路操作。如果中间操作在无限输入时可能产生有限的流,则中间操作是短路。如果终端操作在无限输入时可能在有限时间内终止,则该操作将短路。在管道中进行短路操作是处理无限流在有限时间内正常终止的必要条件,但不是充分条件。
排比
具有显式 for 循环的处理元素本质上是串行的。流通过将计算重构为聚合操作的管道,而不是对每个单独元素的命令性操作,从而促进并行执行。所有流操作都可以串行或并行执行。除非显式请求并行性,否则 JDK 中的流实现会创建串行流。例如,Collection 具有 java.util.Collection.stream 和 java.util.Collection.parallelStream 方法,它们分别生成顺序流和并行流;其他承载流的方法(如 IntStream.range(int, int))会生成顺序流,但这些流可以通过调用其 BaseStream.parallel() 方法有效地并行化。要并行执行先前的“小部件权重总和”查询,我们将执行以下操作:
int sumOfWeights = widgets.parallelStream() .filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();
此示例的串行版本和并行版本之间的唯一区别是使用“parallelStream()”而不是“stream()”创建初始流。流管道按顺序或并行执行,具体取决于调用终端操作的流的模式。可以使用 BaseStream.isParallel() 方法确定流的顺序或并行模式,并且可以使用 BaseStream.sequential() 和 BaseStream.parallel() 操作修改流的模式。最新的顺序或并行模式设置适用于整个流管道的执行。
除了标识为显式非确定性的操作(如 findAny())外,流是按顺序执行还是并行执行都不应更改计算结果。
大多数流操作都接受描述用户指定行为的参数,这些参数通常是 lambda 表达式。为了保持正确的行为,这些行为参数必须是无干扰的,并且在大多数情况下必须是无状态的。此类参数始终是函数接口(如 java.util.function.Function)的实例,并且通常是 lambda 表达式或方法引用。
互不干扰
流使您能够对各种数据源执行可能的并行聚合操作,甚至包括非线程安全的集合,如 ArrayList。只有当我们能够防止在执行流管道期间干扰数据源时,这才有可能。除了 escape-hatch 操作 iterator() 和 spliterator() 之外,执行在调用终端操作时开始,在终端操作完成时结束。对于大多数数据源,防止干扰意味着确保在流管道执行期间根本不修改数据源。值得注意的例外是源是并发集合的流,这些集合专门设计用于处理并发修改。并发流源是其 Spliterator 报告 CONCURRENT 特征的源。
因此,源可能不是并发的流管道中的行为参数不应修改流的数据源。如果行为参数修改或导致修改流的数据源,则该参数会干扰非并发数据源。对无干扰的需求适用于所有管道,而不仅仅是平行管道。除非流源是并发的,否则在执行流管道期间修改流的数据源可能会导致异常、不正确的答案或不合规的行为。对于表现良好的流源,可以在终端操作开始之前修改源,这些修改将反映在涵盖的元素中。例如,请考虑以下代码:
列表<String> l = new ArrayList(Arrays.asList(“one”, “two”)); 流<String> sl = l.stream(); l.add(“三”); 字符串 s = sl.collect(joining(“ ”));
首先创建一个由两个字符串组成的列表:“one”;和“两个”。然后,从该列表创建一个流。接下来,通过添加第三个字符串来修改列表:“three”。最后,将流的元素收集并连接在一起。由于列表是在终端收集操作开始之前修改的,因此结果将是“一、二、三”的字符串。从 JDK 集合返回的所有流以及大多数其他 JDK 类都以这种方式运行良好;对于其他库生成的流,请参阅低级流构造,了解构建行为良好的流的要求。
无状态行为
如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。有状态 lambda(或实现相应功能接口的其他对象)是其结果取决于在流管道执行期间可能更改的任何状态的 lambda 。有状态 lambda 的一个示例是 map() 的参数:
<Integer> 设置看到 = Collections.synchronizedSet(new HashSet<>()); stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
在这里,如果并行执行映射操作,则由于线程调度差异,同一输入的结果可能因运行而异,而使用无状态 lambda 表达式时,结果将始终相同。
另请注意,尝试从行为参数访问可变状态会给您在安全性和性能方面带来一个错误的选择;如果不同步对该状态的访问,则会出现数据争用,因此代码会中断,但如果确实同步了对该状态的访问,则可能会发生争用,从而破坏您寻求从中受益的并行性。最好的方法是完全避免流式传输操作的有状态行为参数;通常有一种方法可以重构流管道以避免有状态。
副作用
通常,不鼓励对流操作的行为参数产生副作用,因为它们通常会导致在不知不觉中违反无状态要求,以及其他线程安全隐患。
如果行为参数确实有副作用,除非明确说明,否则无法保证:
这些副作用对其他线程的可见性;
在同一线程中对同一流管道中的“同一”元素执行不同的操作;和
该行为参数始终被调用,因为如果流实现可以证明它不会影响计算结果,则可以自由地从流管道中省略操作(或整个阶段)。
副作用的顺序可能令人惊讶。即使管道被约束为生成与流源的遭遇顺序一致的结果
(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() must produce [0, 2, 4, 6, 8])
也无法保证映射器函数应用于单个元素的顺序, 或者在哪个线程中为给定元素执行任何行为参数。
消除副作用也可能令人惊讶。除了 forEach 和 forEachOrdered 的终端操作外,当流实现可以在不影响计算结果的情况下优化行为参数的执行时,行为参数的副作用可能并不总是执行。(有关特定示例,请参阅有关计数操作的 API 说明。
许多人们可能想使用副作用的计算可以更安全、更有效地表达,而不会产生副作用,例如使用约简而不是可变累加器。但是,副作用(例如使用 println() 进行调试)通常是无害的。少量的流操作,如 forEach() 和 peek(),只能通过副作用来操作;这些应小心使用。
作为如何将不当使用副作用的流管道转换为不使用副作用的流管道的示例,以下代码在字符串流中搜索与给定正则表达式匹配的字符串,并将匹配项放在列表中。
ArrayList<String> 结果 = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s));
不 必要的副作用使用!
此代码不必要地使用副作用。如果并行执行,ArrayList 的非线程安全性将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。此外,在这里使用副作用是完全没有必要的;forEach() 可以简单地替换为更安全、更高效、更适合并行化的约简操作:
列表<String>结果 = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList()); 无副作用!
流可能有也可能没有定义的相遇顺序。流是否具有相遇顺序取决于源和中间操作。某些流源(如 List 或数组)本质上是有序的,而其他流源(如 HashSet)则不是。某些中间操作(如 sorted())可能会对其他无序流施加相遇顺序,而其他操作可能会使有序流无序,例如 BaseStream.unordered()。此外,某些终端操作可能会忽略遭遇顺序,例如 forEach()。
如果对流进行排序,则大多数操作被约束为按其相遇顺序对元素进行操作;如果流的源是包含 [1, 2, 3] 的 List,则执行 map(x -> x*2) 的结果必须为 [2, 4, 6]。但是,如果源没有定义的相遇顺序,则值 [2, 4, 6] 的任何排列都将是有效的结果。
对于顺序流,相遇顺序的存在与否不会影响性能,只会影响确定性。如果对流进行排序,则在相同的源上重复执行相同的流管道将产生相同的结果;如果不排序,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以提高执行效率。如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复项 ( distinct()) 或分组缩减 ( Collectors.groupingBy())。同样,本质上与遇到顺序相关的操作(如 limit())可能需要缓冲以确保正确的排序,从而破坏了并行性的好处。如果流具有相遇顺序,但用户并不特别关心该相遇顺序,则使用 unordered() 显式取消排序流可能会提高某些有状态或终端操作的并行性能。但是,即使在排序约束下,大多数流管道(例如上面的“块权重总和”示例)仍然可以有效地并行化。
减少操作
约简运算(也称为折叠)采用一系列输入元素,并通过重复应用组合运算(例如查找一组数字的总和或最大值,或将元素累积到列表中)将它们组合成一个汇总结果。流类具有多种形式的常规约简操作,称为 reduce() 和 collect(),以及多种专用约简形式,例如 sum()、max() 或 count()。
当然,这样的操作可以很容易地实现为简单的顺序循环,如:
int 总和 = 0; for (int x : 数字) { sum += x; }
但是,有充分的理由更喜欢减少操作而不是上述突变累积。化约不仅“更抽象”——它在整个流上操作,而不是单个元素——而且正确构造的化简操作本质上是可并行的,只要用于处理元素的函数是关联的和无状态的。例如,给定一个我们想要找到总和的数字流,我们可以写成:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
艺术
int sum = numbers.stream().reduce(0, Integer::sum);
这些减少操作可以安全地并行运行,几乎不需要修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
约简可以很好地并行化,因为实现可以并行操作数据的子集,然后组合中间结果以获得最终的正确答案。(即使该语言具有“并行 for-each”结构,可变累积方法仍需要开发人员为共享累积变量总和提供线程安全更新,并且所需的同步可能会消除并行性带来的任何性能提升。相反,使用 reduce() 可以消除并行化 reduce 操作的所有负担,并且该库可以提供高效的并行实现,而无需额外的同步。
前面显示的“widgets”示例显示了 reduce 如何与其他操作相结合,以用批量操作替换 for 循环。如果 widgets 是 Widget 对象的集合,它们具有 getWeight 方法,我们可以使用以下命令找到最重的 widget:
OptionalInt heheavyest = widgets.parallelStream() .mapToInt(Widget::getWeight) .max();
在更一般的形式中,对生成类型结果的元素进行 reduce 操作<T> <U> 需要三个参数:
U <U> reduce(U 恒等式、BiFunction<U、? super T、U>累加器、BinaryOperator<U> 组合器);
在这里,标识元素既是约简的初始种子值,也是没有输入元素的默认结果。累加器函数采用部分结果和下一个元素,并生成新的部分结果。组合器函数将两个部分结果组合在一起,以生成新的部分结果。(在并行约简中,合路器是必要的,其中输入被分区,为每个分区计算部分累加,然后将部分结果组合以产生最终结果。
更正式地说,标识值必须是组合器函数的标识。这意味着对于所有 u,combiner.apply(identity, u) 等于 u。此外,组合器函数必须是关联的,并且必须与累加器函数兼容:对于所有 u 和 t,combiner.apply(u, accumulator.apply(identity, t)) 必须等于 accumulator.apply(u, t)。
三参数形式是双参数形式的推广,将映射步骤合并到累积步骤中。我们可以使用更通用的形式重新转换简单的权重总和示例,如下所示:
int sumOfWeights = widgets.stream() .reduce(0, (sum, b) -> sum + b.getWeight(), Integer::sum);
尽管显式的map-reduce形式更具可读性,因此通常应该首选。广义形式适用于通过将映射和约简组合到单个函数中来优化大量工作的情况。
可变性减少
可变缩减操作在处理流中的元素时,将输入元素累积到可变结果容器(如 Collection 或 StringBuilder)中。
如果我们想获取一个字符串流并将它们连接成一个长字符串,我们可以通过普通的简化来实现:
字符串连接 = strings.reduce(“”, String::concat)
我们会得到想要的结果,它甚至可以并行工作。但是,我们可能对性能不满意!这样的实现将执行大量的字符串复制,并且运行时间的字符数为 O(n^2)。一种更高性能的方法是将结果累积到 StringBuilder 中,这是一个用于累积字符串的可变容器。我们可以使用与普通约简相同的技术来并行化可变约简。
可变的约简操作称为 collect(),因为它将所需的结果收集到一个结果容器(如 Collection)中。收集操作需要三个函数:一个供应商函数,用于构造结果容器的新实例,一个累加器函数,用于将输入元素合并到结果容器中,以及一个组合函数,用于将一个结果容器的内容合并到另一个结果容器中。这种形式与普通还原的一般形式非常相似:
R collect(供应商供应商,BiConsumer<R, ?超级T>蓄能器,BiConsumer<R,R>合路器);
与 reduce() 一样,以这种抽象方式表达 collect 的一个好处是它直接适合并行化:我们可以并行累积部分结果,然后将它们组合起来,只要累积和组合函数满足适当的要求。例如,要将流中元素的 String 表示形式收集到 ArrayList 中,我们可以编写明显的顺序 for-each 形式:
ArrayList<String> 字符串 = new ArrayList<>(); for (T 元素 : stream) { strings.add(element.toString()); }
或者我们可以使用可并行化的集合形式:
ArrayList strings = stream.collect(() -> new ArrayList<>(), (c, e) -><String> c.add(e.toString()), (c1, c2) -> c1.addAll(c2));
或者,将映射操作从累加器函数中拉出,我们可以更简洁地表示为:
列表<String>字符串 = stream.map(Object::toString) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
在这里,我们的供应商只是 ArrayList 构造函数,累加器将字符串化元素添加到 ArrayList 中,而组合器只需使用 addAll 将字符串从一个容器复制到另一个容器中。
收集的三个方面——供应商、蓄能器和合路器——是紧密耦合的。我们可以使用 Collector 的抽象来捕获所有三个方面。上面用于将字符串收集到 List 中的示例可以使用标准收集器重写为:
列表字符串 = stream.map(Object::toString) .collect(Collectors.toList());
将可变的约简打包到收集器中还有另一个优点:可组合性。Collectors 类包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。例如,假设我们有一个收集器,用于计算员工流的工资总和,如下所示:
Collector<Employee, ?, Integer> summingSalaries = Collectors.summingInt(Employee::getSalary);
(第二个类型参数的 ? 仅表示我们不关心此收集器使用的中间表示形式。如果我们想创建一个收集器来按部门列出工资总和,我们可以使用 groupingBy 重用 summingSalaries:
Map<Department, Integer> salariesByDept = employees.stream().collect(Collectors.groupingBy(Employee::getDepartment, summingSalaries));
与常规的约简操作一样,只有在满足适当条件的情况下,才能并行化 collect() 操作。对于任何部分累积的结果,将其与空结果容器组合必须产生等效的结果。也就是说,对于作为任何一系列累加器和合路器调用的结果的部分累积结果 p,p 必须等价于 combiner.apply(p, supplier.get())。
此外,无论计算如何拆分,它都必须产生等效的结果。对于任何输入元素 t1 和 t2,以下计算中的结果 r1 和 r2 必须等效:
一个 a1 = supplier.get(); 累加器.accept(a1, t1); 累加器.accept(a1, t2); R r1 = finisher.apply(a1); 结果没有拆分 A a2 = supplier.get(); 累加器.accept(a2, t1); 一个 a3 = supplier.get(); 累加器.accept(a3, t2); R r2 = finisher.apply(combiner.apply(a2, a3)); 拆分结果
在这里,等价通常意味着根据 Object.equals(Object)。但在某些情况下,等价性可能会放宽,以解释顺序上的差异。
缩减、并发和排序
使用一些复杂的约简操作,例如生成 Map 的 collect(),例如:
Map<Buyer, List<Transaction>> salesByBuyer = txns.parallelStream() .collect(Collectors.groupingBy(Transaction::getBuyer));
并行执行操作实际上可能会适得其反。这是因为对于某些 Map 实现来说,合并步骤(通过键将一个 Map 合并到另一个 Map 中)可能很昂贵。
但是,假设此缩减中使用的结果容器是一个可并发修改的集合,例如 java.util.concurrent.ConcurrentHashMap。在这种情况下,累加器的并行调用实际上可以将其结果同时存入同一个共享结果容器中,从而消除了合并器合并不同结果容器的需要。这可能会提高并行执行性能。我们称之为并发减少。
支持并发减少的收集器标有 Collector.Characteristics.CONCURRENT 特征。但是,并发集合也有缺点。如果多个线程同时将结果存放到共享容器中,则结果存放的顺序是不确定的。因此,只有当排序对于正在处理的流不重要时,才有可能进行并发减少。Stream.collect(Collector) 实现仅在以下情况下执行并发缩减
流是平行的;
收集器具有 Collector.Characteristics.CONCURRENT 特征,并且;
流是无序的,或者收集器具有 Collector.Characteristics.UNORDERED 特征。
您可以使用 BaseStream.unordered() 方法确保流是无序的。例如:
Map<Buyer, List<Transaction>> salesByBuyer = txns.parallelStream() .unordered() .collect(groupingByConcurrent(Transaction::getBuyer));
//(其中 Collectors.groupingByConcurrent 是 groupingBy 的并发等效项)。
请注意,如果给定键的元素按照它们在源中的出现顺序显示很重要,那么我们不能使用并发减少,因为排序是并发插入的受害者之一。然后,我们将被迫实现顺序缩减或基于合并的并行缩减。
关联性
如果满足以下条件,则运算符或函数运算是关联的:
(a op b) op c == a op (b op c)
如果我们将其扩展到四个术语,就可以看出这对平行评估的重要性:
a op b op c op d == (a op b) op (c op d)
因此,我们可以同时计算 (a op b) 和 (c op d),然后对结果调用 op 。
关联运算的示例包括数字加法、最小值和最大值以及字符串连接。
低位流建设
到目前为止,所有流示例都使用了 java.util.Collection.stream() 或 java.util.Arrays.stream(Object[]) 等方法来获取流。这些承载流的方法是如何实现的?
StreamSupport 类有许多用于创建流的低级方法,所有方法都使用某种形式的 java.util.Spliterator。拆分器是 java.util.Iterator 的并行模拟;它描述了一个(可能是无限的)元素集合,支持按顺序前进、批量遍历,以及将输入的某些部分拆分到另一个可以并行处理的拆分器中。在最低级别,所有流都由拆分器驱动。
在实现拆分器时,有许多实现选择,几乎所有选择都是在实现的简单性和使用该拆分器的流的运行时性能之间进行权衡。创建拆分器的最简单但性能最低的方法是使用 java.util.Spliterators.spliteratorUnknownSize(java.util.Iterator, int) 从迭代器创建一个拆分器。虽然这样的拆分器可以工作,但它可能会提供较差的并行性能,因为我们丢失了大小调整信息(基础数据集有多大),并且被限制在简单的拆分算法中。
更高质量的拆分器将提供平衡且已知大小的拆分、准确的大小调整信息以及拆分器或数据的许多其他特征,这些特征或数据可供实现用于优化执行。
可变数据源的 Spliterator 还有一个额外的挑战;绑定到数据的时间,因为数据可能会在创建拆分器的时间和执行流管道的时间之间发生变化。理想情况的拆分器将报告 IMMUTABLE 或 CONCURRENT 的特征;如果不是,它应该是后期绑定的。如果源不能直接提供推荐的拆分器,则可以使用 Supplier 间接提供拆分器,并通过 stream() 的 Supplier-accepted 版本构造流。只有在流道的终端运行开始后,才能从供应商处获得分流器。
这些要求大大减少了流源突变与流管道执行之间的潜在干扰范围。基于具有所需特征的拆分器的流,或使用基于供应商的工厂表单的流,在终端操作开始之前不受数据源修改的影响(前提是流操作的行为参数满足非干扰和无状态所需的条件)。有关详细信息,请参阅互不干扰。
{width=“50%” align=“center”}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!