Stream 流操作(中间操作 / 终端操作)

Java 8 引入的 Stream 是函数式编程在 Java 的落地。很多同学用 Stream 写出很优雅的代码,但也有人写出性能极差的代码,或者遇到莫名其妙的"只执行了一半"的问题。

我见过有人这样写代码:

list.stream()
    .filter(x -> x > 0)
    .map(x -> x * 2)
    .sorted()
    .collect(Collectors.toList());

这段代码看起来很简洁,但它是怎么执行的?filter、map、sorted 会按顺序执行吗?

今天我们就来把 Stream 的惰性求值机制彻底讲透。

一、Stream 的基本概念

1.1 Stream 是什么

Stream 不是数据结构,而是一种数据处理的方式:

// 传统循环
List<Integer> result = new ArrayList<>();
for (Integer x : list) {
    if (x > 0) {
        result.add(x * 2);
    }
}

// Stream
List<Integer> result = list.stream()
    .filter(x -> x > 0)
    .map(x -> x * 2)
    .collect(Collectors.toList());

1.2 Stream 的组成

Stream 管道 = Source + 中间操作 + 终端操作

  数据源          中间操作              终端操作
    │               │                    │
    ▼               ▼                    ▼
┌────────┐   ┌────────────┐   ┌──────────────────┐
│  List  │ → │ filter()   │ → │ collect(Collectors│
│  Set   │   │ map()      │   │   .toList())     │
│  Array │   │ sorted()   │   │                  │
│  ...   │   │ distinct() │   │ forEach()        │
└────────┘   │ limit()    │   │ count()          │
             └────────────┘   │ reduce()          │
                               └──────────────────┘

1.3 【直观类比】

【直观类比】

Stream 就像一条流水线:

原材料        流水线机器              产品
  │               │                  │
  ▼               ▼                  ▼
[原料] ───→ [筛选机] ───→ [加工机] ───→ [成品]
              filter           map            toList

你不需要关心中间的每一个步骤,只需要定义"怎么筛选"、"怎么加工",最后指定"产出什么"。

二、中间操作 vs 终端操作

2.1 核心区别

类型特点返回值
中间操作(Intermediate)惰性执行,不触发处理Stream
终端操作(Terminal)触发整个管道执行非 Stream

2.2 惰性求值

中间操作是惰性的,只有遇到终端操作才会真正执行:

Stream<Integer> stream = list.stream()
    .filter(x -> {
        System.out.println("filter: " + x);  // 这行不会执行!
        return x > 0;
    })
    .map(x -> {
        System.out.println("map: " + x);  // 这行也不会执行!
        return x * 2;
    });

// 到这里为止,什么都没打印!

stream.collect(Collectors.toList());  // 遇到终端操作才开始执行

2.3 执行时机图解

未调用终端操作:
Stream<Integer> stream = list.stream()
    .filter(x -> { System.out.println("filter"); return x > 0; })
    .map(x -> { System.out.println("map"); return x * 2; });
// 什么都没发生

调用终端操作后:
stream.collect(Collectors.toList());

// 输出(按实际处理顺序):
// filter: 1
// map: 1
// filter: -2
// filter: 3
// map: 3
// filter: 0

三、常见中间操作

3.1 filter:过滤

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6);

// 筛选偶数
List<Integer> evens = numbers.stream()
    .filter(x -> x % 2 == 0)
    .collect(Collectors.toList());
// 结果:[2, 4, 6]

3.2 map:转换

List<String> names = Arrays.asList("alice", "bob", "charlie");

// 转大写
List<String> upper = names.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
// 结果:[ALICE, BOB, CHARLIE]

// 类型转换
List<Integer> lengths = names.stream()
    .map(String::length)
    .collect(Collectors.toList());
// 结果:[5, 3, 7]

3.3 flatMap:扁平化

flatMap 是 map + flatten:

List<List<Integer>> nested = Arrays.asList(
    Arrays.asList(1, 2),
    Arrays.asList(3, 4),
    Arrays.asList(5, 6)
);

// 普通 map(结果是嵌套列表)
List<List<Integer>> mapped = nested.stream()
    .map(list -> list.stream().map(x -> x * 2).collect(Collectors.toList()))
    .collect(Collectors.toList());
// 结果:[[2, 4], [6, 8], [10, 12]]

// flatMap(结果扁平化)
List<Integer> flat = nested.stream()
    .flatMap(list -> list.stream().map(x -> x * 2))
    .collect(Collectors.toList());
// 结果:[2, 4, 6, 8, 10, 12]

3.4 sorted:排序

List<String> names = Arrays.asList("Charlie", "Alice", "Bob");

// 自然排序
List<String> sorted = names.stream()
    .sorted()
    .collect(Collectors.toList());
// 结果:[Alice, Bob, Charlie]

// 自定义排序
List<String> byLength = names.stream()
    .sorted(Comparator.comparingInt(String::length))
    .collect(Collectors.toList());
// 结果:[Bob, Alice, Charlie]

3.5 distinct:去重

List<Integer> numbers = Arrays.asList(1, 2, 2, 3, 3, 3, 4);

List<Integer> distinct = numbers.stream()
    .distinct()
    .collect(Collectors.toList());
// 结果:[1, 2, 3, 4]

3.6 limit 和 skip:截断

List<Integer> numbers = IntStream.range(1, 100).boxed()
    .collect(Collectors.toList());

// 取前 10 个
List<Integer> limit = numbers.stream()
    .limit(10)
    .collect(Collectors.toList());

// 跳过前 10 个
List<Integer> skip = numbers.stream()
    .skip(10)
    .collect(Collectors.toList());

// 分页:第 2 页,每页 10 个
List<Integer> page2 = numbers.stream()
    .skip(10)
    .limit(10)
    .collect(Collectors.toList());

四、常见终端操作

4.1 collect:收集到集合

List<String> list = Arrays.asList("a", "b", "c");

// 收集到 List
List<String> result1 = list.stream().collect(Collectors.toList());

// 收集到 Set
Set<String> result2 = list.stream().collect(Collectors.toSet());

// 收集到 Map
Map<String, Integer> result3 = list.stream()
    .collect(Collectors.toMap(s -> s, String::length));

// 使用 toCollection 指定集合类型
LinkedList<String> linked = list.stream()
    .collect(Collectors.toCollection(LinkedList::new));

4.2 forEach:遍历

list.stream().forEach(System.out::println);

// 注意:forEachOrdered 保持顺序
list.parallelStream().forEachOrdered(System.out::println);

4.3 reduce:归约

// 求和
int sum = IntStream.range(1, 6).reduce(0, Integer::sum);
// 结果:15

// 求最大值
Optional<Integer> max = IntStream.range(1, 6)
    .boxed()
    .reduce(Integer::max);

// 字符串拼接
String joined = Arrays.asList("a", "b", "c").stream()
    .reduce("", (a, b) -> a + b);
// 结果:abc

4.4 findFirst / findAny:查找

// 查找第一个
Optional<Integer> first = list.stream()
    .filter(x -> x > 3)
    .findFirst();

// findAny 在并行流中返回任意一个,更快
Optional<Integer> any = list.parallelStream()
    .filter(x -> x > 3)
    .findAny();

4.5 anyMatch / allMatch / noneMatch:匹配

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

// 是否有任意一个大于 3
boolean any = list.stream().anyMatch(x -> x > 3);  // true

// 是否全部大于 0
boolean all = list.stream().allMatch(x -> x > 0); // true

// 是否全部不等于 10
boolean none = list.stream().noneMatch(x -> x == 10); // true

4.6 count / min / max

long count = list.stream().count();

Optional<Integer> min = list.stream().min(Integer::compareTo);
Optional<Integer> max = list.stream().max(Integer::compareTo);

五、短路操作

5.1 什么是短路操作

中间操作的短路:limit、skip、distinct 在某些情况下可以提前终止。

终端操作的短路:findFirst、findAny、anyMatch 等不需要遍历全部元素。

// limit 是短路操作
IntStream.range(1, Integer.MAX_VALUE)
    .filter(x -> x % 2 == 0)
    .limit(5)  // 找到 5 个偶数后就停止了
    .forEach(System.out::println);

// 输出:2, 4, 6, 8, 10
// 不会遍历到 Integer.MAX_VALUE

5.2 组合短路

// 找出第一个平方大于 50 的数
OptionalInt result = IntStream.range(1, 100)
    .filter(x -> x * x > 50)
    .findFirst();
// 输出:8(8*8=64>50)
// 只遍历到 8,不会继续

六、顺序流 vs 并行流

6.1 parallelStream

// 顺序流
list.stream().filter(x -> x > 0).collect(toList());

// 并行流
list.parallelStream().filter(x -> x > 0).collect(toList());

// 也可以从顺序流转并行流
list.stream().parallel().filter(x -> x > 0).collect(toList());

6.2 并行流的底层原理

并行流底层使用 ForkJoinPool.commonPool():

// ForkJoinPool 默认并行度 = CPU 核心数 - 1
// 4 核 CPU:并行度 = 3
// 可以通过系统属性修改
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=4

6.3 并行流不是万能的

// ❌ 错误:并行流不一定更快
// 小数据量:并行开销 > 串行优势
List<Integer> small = Arrays.asList(1, 2, 3);
small.parallelStream().filter(x -> x > 0).collect(toList()); // 串行更快

// CPU 密集型:并行效果好
// IO 密集型:并行效果可能不好(线程等待 IO)

6.4 并行流的注意事项

// ❌ 并行流中使用有副作用的操作
Set<String> set = ConcurrentHashMap.newKeySet();
list.parallelStream().forEach(set::add);  // 不确定的行为

// ✅ 使用正确的方式
Set<String> correct = list.parallelStream()
    .collect(Collectors.toSet());

// ❌ 顺序敏感的操作在并行流中可能出问题
List<Integer> orderSensitive = list.parallelStream()
    .collect(Collectors.toList());  // 不保证顺序!

七、【直观类比】管道执行顺序

【直观类比】

Stream 管道就像一条自助洗衣流水线:

           中间操作(洗涤步骤)

  ┌─────────────┼─────────────┐
  │             │             │
  ▼             ▼             ▼
[预洗] ───→ [主洗] ───→ [漂洗] ───→ [脱水] ───→ [烘干]
 filter      map         distinct     sorted      collect

关键是:所有步骤同时准备就绪,但只有在最后一步(终端操作)触发时,整个流水线才开始运转

而且,流水线是"拉取"模式:不是 filter 先把所有数据洗一遍再给 map,而是每来一个数据就经过所有步骤。

八、生产避坑

8.1 ❌ 错误示范:修改数据源

List<String> list = new ArrayList<>();
list.add("a");
list.add("b");

// ❌ 在 Stream 中修改数据源
list.stream().forEach(x -> {
    if ("a".equals(x)) {
        list.remove(x);  // ConcurrentModificationException!
    }
});

8.2 ❌ 错误示范:忘记终端操作

list.stream()
    .filter(x -> x > 0)
    .map(x -> x * 2);
// ❌ 没有终端操作,什么都不会执行!

8.3 ❌ 错误示范:Stream 被重复使用

Stream<String> stream = list.stream()
    .map(String::toUpperCase);

// 第一次消费
stream.collect(toList());

// ❌ 第二次消费会报错
stream.filter(x -> x.startsWith("A")).collect(toList());
// Stream 已经关闭,不能再使用

8.4 ✅ 正确示范:复用 Stream 的结果

// 把 Stream 的结果收集起来
List<String> result = list.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());

// 可以多次使用收集后的结果
result.stream().filter(x -> x.startsWith("A")).collect(toList());
result.stream().forEach(System.out::println);

九、面试追问链

第一层:惰性求值

面试官问:"Stream 的中间操作是立即执行的吗?"

不是。中间操作是惰性的,只有遇到终端操作才会触发整个管道的执行。这叫做惰性求值(Lazy Evaluation)。

第二层:执行原理

面试官追问:"Stream 是怎么处理数据的?是 filter 全部执行完再 map 吗?"

不是。Stream 使用"拉取"模式,当需要处理一个元素时,这个元素会依次经过所有中间操作,而不是等一个操作处理完所有元素再进入下一个操作。

第三层:并行流

面试官追问:"parallelStream 和普通 stream 有什么区别?什么时候用并行流?"

parallelStream 使用 ForkJoin 框架并行处理,适合数据量大、CPU 密集、顺序不敏感的场景。不适合小数据量、IO 密集、有状态操作。

第四层:性能优化

面试官追问:"有什么 Stream 性能优化的建议?"

数据量小用顺序流,数据量大、CPU 密集才考虑并行流;避免在 Lambda 中捕获可变对象;复杂管道可以适当拆分。

【学习小结】

  • Stream = Source + 中间操作 + 终端操作
  • 中间操作惰性执行,只有终端操作才触发
  • 短路操作可以提前终止
  • parallelStream 使用 ForkJoinPool,适合大数据量
  • 不要在 Stream 中修改数据源
  • Stream 只能用一次,记得保存结果