Fork/Join 框架
候选人小林在面试蚂蚁 P7 时,面试官问道:
"Fork/Join 是什么?它和普通线程池有什么区别?"
小林说:"用于分治任务的..."面试官追问:"工作窃取算法是什么?"
小林答不上来。面试官继续:"RecursiveTask 和 RecursiveAction 有什么区别?"
小林彻底卡住了...
一、核心问题:Fork/Join 框架 🔴
1.1 问题拆解
第一层:设计动机(为什么需要?)
"Fork/Join 和普通线程池有什么区别?"
考察点:分治任务、递归拆分、工作窃取
第二层:工作窃取(怎么做到?)
"什么是工作窃取算法?它解决了什么问题?"
考察点:双端队列、偷取者与被偷者
第三层:RecursiveTask(怎么用?)
"RecursiveTask 和 RecursiveAction 的区别是什么?"
考察点:有返回值 vs 无返回值
1.2 ❌ 错误示范
候选人原话 A:"Fork/Join 就是把大任务拆成小任务,没什么特别的。"
问题诊断:Fork/Join 的核心是工作窃取——当某个线程的队列空了,它可以从其他线程的队列尾部"偷取"任务。这解决了普通线程池中任务分配不均导致的线程饥饿问题。
候选人原话 B:"Fork/Join 适合所有并发任务。"
问题诊断:Fork/Join 适合计算密集型的递归分治任务(如并行排序、并行搜索)。对于 I/O 密集型任务,使用普通线程池更合适。
1.3 标准回答
P5 级别:设计动机
普通线程池的问题:
普通线程池的任务分配是静态的——任务提交给线程池后,线程从队列取任务。如果任务大小不均匀,可能导致某些线程很忙而其他线程空闲。
Fork/Join 的解决方案:
Fork/Join 使用工作窃取(Work Stealing)算法:
- 每个工作线程有自己的任务队列(双端队列)
- fork() 将任务放到自己的队列尾部
- 工作线程从自己的队列头部取任务
- 当自己的队列为空时,从其他线程的队列尾部偷取任务
// ForkJoin 计算并行和
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private int[] array;
private int start, end;
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return Arrays.stream(array, start, end).asLongStream().sum();
}
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步提交给其他线程
Long rightResult = right.compute(); // 当前线程继续计算
Long leftResult = left.join(); // 等待并获取结果
return leftResult + rightResult;
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new SumTask(array, 0, array.length));
P6 级别:工作窃取详解
双端队列的操作:
graph LR
A[工作线程A] -->|push()| B[双端队列]
A -->|pop()| B
C[工作线程B 偷取]|-->|steal()| B
B -->|尾部| D[任务1 任务2 任务3]
B -->|头部| E[最新任务]
设计哲学:
- push():新任务加入队列尾部(生产者)
- pop():从队列头部取任务(消费者)
- steal():从队列尾部偷取任务(偷取者)
这样做的好处:
- 自己的工作优先——减少与其他线程的竞争
- 从尾部偷取——尾部是最老的任务,通常也是最接近完成的任务
- 减少伪共享——头部和尾部分别被不同线程操作
P7 级别:RecursiveTask vs RecursiveAction
RecursiveTask<V>:有返回值的分治任务
abstract class RecursiveTask<V> extends ForkJoinTask<V> {
protected abstract V compute();
}
class BinarySearchTask extends RecursiveTask<Integer> {
@Override
protected Integer compute() {
int result = binarySearch();
if (needParallel()) {
BinarySearchTask left = new BinarySearchTask(...);
left.fork();
Integer rightResult = compute();
return Math.max(rightResult, left.join());
}
return result;
}
}
RecursiveAction:无返回值的分治任务
abstract class RecursiveAction extends ForkJoinTask<Void> {
protected abstract void compute();
}
class SortTask extends RecursiveAction {
@Override
protected void compute() {
Arrays.sort(array, start, end);
if (needParallel()) {
SortTask left = new SortTask(...);
SortTask right = new SortTask(...);
invokeAll(left, right); // 提交两个任务
}
}
}
ForkJoinPool 的并行度:
ForkJoinPool pool = new ForkJoinPool(); // 默认并行度 = availableProcessors()
ForkJoinPool pool = new ForkJoinPool(8); // 指定 8 个工作线程
// ForkJoinPool.commonPool() 默认并行度 = availableProcessors() - 1
ExecutorService common = ForkJoinPool.commonPool();
【面试官心理】
这道题我能问到 P7 级别,是因为 Fork/Join 的工作窃取算法是经典的分治并行算法设计。能说清双端队列操作分工的候选人说明他理解了减少竞争的思想。
1.4 追问升级
追问 1:Fork/Join 和 Stream parallelStream 的关系?
JDK 8 的 parallelStream() 内部使用 ForkJoinPool.commonPool():
List<String> result = list.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList());
// 内部使用 ForkJoinPool.commonPool()
追问 2:Fork/Join 的线程数应该怎么配置?
对于 CPU 密集型任务,并行度 = CPU 核心数即可。
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors() // CPU 核心数
);
二、与普通线程池的对比 🟡
2.1 适用场景
2.2 并行排序示例
// JDK 8 parallelSort(内部使用 Fork/Join)
int[] array = new int[1000000];
Arrays.parallelSort(array); // ForkJoinPool.commonPool()
// 手动 Fork/Join
ForkJoinPool pool = new ForkJoinPool();
pool.submit(() -> Arrays.parallelSort(array, 0, array.length));
pool.shutdown();
三、生产避坑 🟡
3.1 阈值设置不当
// 错误:阈值太小,导致任务拆分过多,开销大于收益
protected static final int THRESHOLD = 10;
// 正确:经验值,任务足够大时拆分才有意义
protected static final int THRESHOLD = 1000;
3.2 递归深度过大
递归深度过大可能导致 StackOverflowError。使用 while 循环 + LinkedTransferQueue 的手动拆分模式更适合深度递归。
四、相关类 🟢
4.1 ForkJoinTask vs RecursiveTask
ForkJoinTask<V> 是 RecursiveTask<V> 和 RecursiveAction 的父类。ForkJoinTask 是抽象的,你通常继承 RecursiveTask 或 RecursiveAction。
4.2 CountedCompleter
JDK 8 引入了 CountedCompleter,用于需要多个子任务完成后执行汇总操作的场景:
class TreeNodeProcessor extends CountedCompleter<Void> {
> @Override
> public void compute() {
> // 处理节点
> for (TreeNode child : children) {
> addToPendingCount(1); // 增加待完成计数
> new TreeNodeProcessor(child, this).fork();
> }
> tryComplete(); // 完成后触发 onCompletion
> }
>}
💡
面试加分点:能说出"JDK 8 的 CompletableFuture 内部使用 ForkJoinPool.commonPool(),这意味着如果你同时使用了 parallelStream() 和 CompletableFuture,它们会共享同一个线程池,可能导致相互影响",说明他对 JDK 8 并发工具有系统性理解。
⚠️
面试陷阱:被问到"ForkJoinPool 的 asyncMode 是什么意思",很多人会说"不知道"。准确答案是:asyncMode=true 表示任务以 FIFO 顺序异步执行,适合事件驱动的任务流模式;false 表示 LIFO,适合递归分治模式(默认)。