Stream 流操作(中间操作 / 终端操作)
Java 8 引入的 Stream 是函数式编程在 Java 的落地。很多同学用 Stream 写出很优雅的代码,但也有人写出性能极差的代码,或者遇到莫名其妙的"只执行了一半"的问题。
我见过有人这样写代码:
list.stream()
.filter(x -> x > 0)
.map(x -> x * 2)
.sorted()
.collect(Collectors.toList());
这段代码看起来很简洁,但它是怎么执行的?filter、map、sorted 会按顺序执行吗?
今天我们就来把 Stream 的惰性求值机制彻底讲透。
一、Stream 的基本概念
1.1 Stream 是什么
Stream 不是数据结构,而是一种数据处理的方式:
// 传统循环
List<Integer> result = new ArrayList<>();
for (Integer x : list) {
if (x > 0) {
result.add(x * 2);
}
}
// Stream
List<Integer> result = list.stream()
.filter(x -> x > 0)
.map(x -> x * 2)
.collect(Collectors.toList());
1.2 Stream 的组成
Stream 管道 = Source + 中间操作 + 终端操作
数据源 中间操作 终端操作
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────────┐ ┌──────────────────┐
│ List │ → │ filter() │ → │ collect(Collectors│
│ Set │ │ map() │ │ .toList()) │
│ Array │ │ sorted() │ │ │
│ ... │ │ distinct() │ │ forEach() │
└────────┘ │ limit() │ │ count() │
└────────────┘ │ reduce() │
└──────────────────┘
1.3 【直观类比】
【直观类比】
Stream 就像一条流水线:
原材料 流水线机器 产品
│ │ │
▼ ▼ ▼
[原料] ───→ [筛选机] ───→ [加工机] ───→ [成品]
filter map toList
你不需要关心中间的每一个步骤,只需要定义"怎么筛选"、"怎么加工",最后指定"产出什么"。
二、中间操作 vs 终端操作
2.1 核心区别
2.2 惰性求值
中间操作是惰性的,只有遇到终端操作才会真正执行:
Stream<Integer> stream = list.stream()
.filter(x -> {
System.out.println("filter: " + x); // 这行不会执行!
return x > 0;
})
.map(x -> {
System.out.println("map: " + x); // 这行也不会执行!
return x * 2;
});
// 到这里为止,什么都没打印!
stream.collect(Collectors.toList()); // 遇到终端操作才开始执行
2.3 执行时机图解
未调用终端操作:
Stream<Integer> stream = list.stream()
.filter(x -> { System.out.println("filter"); return x > 0; })
.map(x -> { System.out.println("map"); return x * 2; });
// 什么都没发生
调用终端操作后:
stream.collect(Collectors.toList());
// 输出(按实际处理顺序):
// filter: 1
// map: 1
// filter: -2
// filter: 3
// map: 3
// filter: 0
三、常见中间操作
3.1 filter:过滤
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);
// 筛选偶数
List<Integer> evens = numbers.stream()
.filter(x -> x % 2 == 0)
.collect(Collectors.toList());
// 结果:[2, 4, 6]
3.2 map:转换
List<String> names = Arrays.asList("alice", "bob", "charlie");
// 转大写
List<String> upper = names.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
// 结果:[ALICE, BOB, CHARLIE]
// 类型转换
List<Integer> lengths = names.stream()
.map(String::length)
.collect(Collectors.toList());
// 结果:[5, 3, 7]
3.3 flatMap:扁平化
flatMap 是 map + flatten:
List<List<Integer>> nested = Arrays.asList(
Arrays.asList(1, 2),
Arrays.asList(3, 4),
Arrays.asList(5, 6)
);
// 普通 map(结果是嵌套列表)
List<List<Integer>> mapped = nested.stream()
.map(list -> list.stream().map(x -> x * 2).collect(Collectors.toList()))
.collect(Collectors.toList());
// 结果:[[2, 4], [6, 8], [10, 12]]
// flatMap(结果扁平化)
List<Integer> flat = nested.stream()
.flatMap(list -> list.stream().map(x -> x * 2))
.collect(Collectors.toList());
// 结果:[2, 4, 6, 8, 10, 12]
3.4 sorted:排序
List<String> names = Arrays.asList("Charlie", "Alice", "Bob");
// 自然排序
List<String> sorted = names.stream()
.sorted()
.collect(Collectors.toList());
// 结果:[Alice, Bob, Charlie]
// 自定义排序
List<String> byLength = names.stream()
.sorted(Comparator.comparingInt(String::length))
.collect(Collectors.toList());
// 结果:[Bob, Alice, Charlie]
3.5 distinct:去重
List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3, 4);
List<Integer> distinct = numbers.stream()
.distinct()
.collect(Collectors.toList());
// 结果:[1, 2, 3, 4]
3.6 limit 和 skip:截断
List<Integer> numbers = IntStream.range(1, 100).boxed()
.collect(Collectors.toList());
// 取前 10 个
List<Integer> limit = numbers.stream()
.limit(10)
.collect(Collectors.toList());
// 跳过前 10 个
List<Integer> skip = numbers.stream()
.skip(10)
.collect(Collectors.toList());
// 分页:第 2 页,每页 10 个
List<Integer> page2 = numbers.stream()
.skip(10)
.limit(10)
.collect(Collectors.toList());
四、常见终端操作
4.1 collect:收集到集合
List<String> list = Arrays.asList("a", "b", "c");
// 收集到 List
List<String> result1 = list.stream().collect(Collectors.toList());
// 收集到 Set
Set<String> result2 = list.stream().collect(Collectors.toSet());
// 收集到 Map
Map<String, Integer> result3 = list.stream()
.collect(Collectors.toMap(s -> s, String::length));
// 使用 toCollection 指定集合类型
LinkedList<String> linked = list.stream()
.collect(Collectors.toCollection(LinkedList::new));
4.2 forEach:遍历
list.stream().forEach(System.out::println);
// 注意:forEachOrdered 保持顺序
list.parallelStream().forEachOrdered(System.out::println);
4.3 reduce:归约
// 求和
int sum = IntStream.range(1, 6).reduce(0, Integer::sum);
// 结果:15
// 求最大值
Optional<Integer> max = IntStream.range(1, 6)
.boxed()
.reduce(Integer::max);
// 字符串拼接
String joined = Arrays.asList("a", "b", "c").stream()
.reduce("", (a, b) -> a + b);
// 结果:abc
4.4 findFirst / findAny:查找
// 查找第一个
Optional<Integer> first = list.stream()
.filter(x -> x > 3)
.findFirst();
// findAny 在并行流中返回任意一个,更快
Optional<Integer> any = list.parallelStream()
.filter(x -> x > 3)
.findAny();
4.5 anyMatch / allMatch / noneMatch:匹配
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 是否有任意一个大于 3
boolean any = list.stream().anyMatch(x -> x > 3); // true
// 是否全部大于 0
boolean all = list.stream().allMatch(x -> x > 0); // true
// 是否全部不等于 10
boolean none = list.stream().noneMatch(x -> x == 10); // true
4.6 count / min / max
long count = list.stream().count();
Optional<Integer> min = list.stream().min(Integer::compareTo);
Optional<Integer> max = list.stream().max(Integer::compareTo);
五、短路操作
5.1 什么是短路操作
中间操作的短路:limit、skip、distinct 在某些情况下可以提前终止。
终端操作的短路:findFirst、findAny、anyMatch 等不需要遍历全部元素。
// limit 是短路操作
IntStream.range(1, Integer.MAX_VALUE)
.filter(x -> x % 2 == 0)
.limit(5) // 找到 5 个偶数后就停止了
.forEach(System.out::println);
// 输出:2, 4, 6, 8, 10
// 不会遍历到 Integer.MAX_VALUE
5.2 组合短路
// 找出第一个平方大于 50 的数
OptionalInt result = IntStream.range(1, 100)
.filter(x -> x * x > 50)
.findFirst();
// 输出:8(8*8=64>50)
// 只遍历到 8,不会继续
六、顺序流 vs 并行流
6.1 parallelStream
// 顺序流
list.stream().filter(x -> x > 0).collect(toList());
// 并行流
list.parallelStream().filter(x -> x > 0).collect(toList());
// 也可以从顺序流转并行流
list.stream().parallel().filter(x -> x > 0).collect(toList());
6.2 并行流的底层原理
并行流底层使用 ForkJoinPool.commonPool():
// ForkJoinPool 默认并行度 = CPU 核心数 - 1
// 4 核 CPU:并行度 = 3
// 可以通过系统属性修改
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=4
6.3 并行流不是万能的
// ❌ 错误:并行流不一定更快
// 小数据量:并行开销 > 串行优势
List<Integer> small = Arrays.asList(1, 2, 3);
small.parallelStream().filter(x -> x > 0).collect(toList()); // 串行更快
// CPU 密集型:并行效果好
// IO 密集型:并行效果可能不好(线程等待 IO)
6.4 并行流的注意事项
// ❌ 并行流中使用有副作用的操作
Set<String> set = ConcurrentHashMap.newKeySet();
list.parallelStream().forEach(set::add); // 不确定的行为
// ✅ 使用正确的方式
Set<String> correct = list.parallelStream()
.collect(Collectors.toSet());
// ❌ 顺序敏感的操作在并行流中可能出问题
List<Integer> orderSensitive = list.parallelStream()
.collect(Collectors.toList()); // 不保证顺序!
七、【直观类比】管道执行顺序
【直观类比】
Stream 管道就像一条自助洗衣流水线:
中间操作(洗涤步骤)
│
┌─────────────┼─────────────┐
│ │ │
▼ ▼ ▼
[预洗] ───→ [主洗] ───→ [漂洗] ───→ [脱水] ───→ [烘干]
filter map distinct sorted collect
关键是:所有步骤同时准备就绪,但只有在最后一步(终端操作)触发时,整个流水线才开始运转。
而且,流水线是"拉取"模式:不是 filter 先把所有数据洗一遍再给 map,而是每来一个数据就经过所有步骤。
八、生产避坑
8.1 ❌ 错误示范:修改数据源
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
// ❌ 在 Stream 中修改数据源
list.stream().forEach(x -> {
if ("a".equals(x)) {
list.remove(x); // ConcurrentModificationException!
}
});
8.2 ❌ 错误示范:忘记终端操作
list.stream()
.filter(x -> x > 0)
.map(x -> x * 2);
// ❌ 没有终端操作,什么都不会执行!
8.3 ❌ 错误示范:Stream 被重复使用
Stream<String> stream = list.stream()
.map(String::toUpperCase);
// 第一次消费
stream.collect(toList());
// ❌ 第二次消费会报错
stream.filter(x -> x.startsWith("A")).collect(toList());
// Stream 已经关闭,不能再使用
8.4 ✅ 正确示范:复用 Stream 的结果
// 把 Stream 的结果收集起来
List<String> result = list.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
// 可以多次使用收集后的结果
result.stream().filter(x -> x.startsWith("A")).collect(toList());
result.stream().forEach(System.out::println);
九、面试追问链
第一层:惰性求值
面试官问:"Stream 的中间操作是立即执行的吗?"
不是。中间操作是惰性的,只有遇到终端操作才会触发整个管道的执行。这叫做惰性求值(Lazy Evaluation)。
第二层:执行原理
面试官追问:"Stream 是怎么处理数据的?是 filter 全部执行完再 map 吗?"
不是。Stream 使用"拉取"模式,当需要处理一个元素时,这个元素会依次经过所有中间操作,而不是等一个操作处理完所有元素再进入下一个操作。
第三层:并行流
面试官追问:"parallelStream 和普通 stream 有什么区别?什么时候用并行流?"
parallelStream 使用 ForkJoin 框架并行处理,适合数据量大、CPU 密集、顺序不敏感的场景。不适合小数据量、IO 密集、有状态操作。
第四层:性能优化
面试官追问:"有什么 Stream 性能优化的建议?"
数据量小用顺序流,数据量大、CPU 密集才考虑并行流;避免在 Lambda 中捕获可变对象;复杂管道可以适当拆分。
【学习小结】
- Stream = Source + 中间操作 + 终端操作
- 中间操作惰性执行,只有终端操作才触发
- 短路操作可以提前终止
- parallelStream 使用 ForkJoinPool,适合大数据量
- 不要在 Stream 中修改数据源
- Stream 只能用一次,记得保存结果