Fork/Join 框架

候选人小林在面试蚂蚁 P7 时,面试官问道:

"Fork/Join 是什么?它和普通线程池有什么区别?"

小林说:"用于分治任务的..."面试官追问:"工作窃取算法是什么?"

小林答不上来。面试官继续:"RecursiveTask 和 RecursiveAction 有什么区别?"

小林彻底卡住了...

一、核心问题:Fork/Join 框架 🔴

1.1 问题拆解

第一层:设计动机(为什么需要?)
  "Fork/Join 和普通线程池有什么区别?"
  考察点:分治任务、递归拆分、工作窃取

第二层:工作窃取(怎么做到?)
  "什么是工作窃取算法?它解决了什么问题?"
  考察点:双端队列、偷取者与被偷者

第三层:RecursiveTask(怎么用?)
  "RecursiveTask 和 RecursiveAction 的区别是什么?"
  考察点:有返回值 vs 无返回值

1.2 ❌ 错误示范

候选人原话 A:"Fork/Join 就是把大任务拆成小任务,没什么特别的。"

问题诊断:Fork/Join 的核心是工作窃取——当某个线程的队列空了,它可以从其他线程的队列尾部"偷取"任务。这解决了普通线程池中任务分配不均导致的线程饥饿问题。

候选人原话 B:"Fork/Join 适合所有并发任务。"

问题诊断:Fork/Join 适合计算密集型的递归分治任务(如并行排序、并行搜索)。对于 I/O 密集型任务,使用普通线程池更合适。

1.3 标准回答

P5 级别:设计动机

普通线程池的问题

普通线程池的任务分配是静态的——任务提交给线程池后,线程从队列取任务。如果任务大小不均匀,可能导致某些线程很忙而其他线程空闲。

Fork/Join 的解决方案

Fork/Join 使用工作窃取(Work Stealing)算法:

  1. 每个工作线程有自己的任务队列(双端队列)
  2. fork() 将任务放到自己的队列尾部
  3. 工作线程从自己的队列头部取任务
  4. 当自己的队列为空时,从其他线程的队列尾部偷取任务
// ForkJoin 计算并行和
class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private int[] array;
    private int start, end;

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return Arrays.stream(array, start, end).asLongStream().sum();
        }

        int mid = (start + end) / 2;
        SumTask left = new SumTask(array, start, mid);
        SumTask right = new SumTask(array, mid, end);

        left.fork();   // 异步提交给其他线程
        Long rightResult = right.compute();  // 当前线程继续计算
        Long leftResult = left.join();       // 等待并获取结果

        return leftResult + rightResult;
    }
}

// 使用
ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new SumTask(array, 0, array.length));

P6 级别:工作窃取详解

双端队列的操作

graph LR
    A[工作线程A] -->|push()| B[双端队列]
    A -->|pop()| B
    C[工作线程B 偷取]|-->|steal()| B

    B -->|尾部| D[任务1 任务2 任务3]
    B -->|头部| E[最新任务]

设计哲学

  • push():新任务加入队列尾部(生产者)
  • pop():从队列头部取任务(消费者)
  • steal():从队列尾部偷取任务(偷取者)

这样做的好处:

  1. 自己的工作优先——减少与其他线程的竞争
  2. 从尾部偷取——尾部是最老的任务,通常也是最接近完成的任务
  3. 减少伪共享——头部和尾部分别被不同线程操作

P7 级别:RecursiveTask vs RecursiveAction

RecursiveTask<V>:有返回值的分治任务

abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    protected abstract V compute();
}

class BinarySearchTask extends RecursiveTask<Integer> {
    @Override
    protected Integer compute() {
        int result = binarySearch();
        if (needParallel()) {
            BinarySearchTask left = new BinarySearchTask(...);
            left.fork();
            Integer rightResult = compute();
            return Math.max(rightResult, left.join());
        }
        return result;
    }
}

RecursiveAction:无返回值的分治任务

abstract class RecursiveAction extends ForkJoinTask<Void> {
    protected abstract void compute();
}

class SortTask extends RecursiveAction {
    @Override
    protected void compute() {
        Arrays.sort(array, start, end);
        if (needParallel()) {
            SortTask left = new SortTask(...);
            SortTask right = new SortTask(...);
            invokeAll(left, right);  // 提交两个任务
        }
    }
}

ForkJoinPool 的并行度

ForkJoinPool pool = new ForkJoinPool();  // 默认并行度 = availableProcessors()
ForkJoinPool pool = new ForkJoinPool(8);  // 指定 8 个工作线程

// ForkJoinPool.commonPool() 默认并行度 = availableProcessors() - 1
ExecutorService common = ForkJoinPool.commonPool();

【面试官心理】 这道题我能问到 P7 级别,是因为 Fork/Join 的工作窃取算法是经典的分治并行算法设计。能说清双端队列操作分工的候选人说明他理解了减少竞争的思想。

1.4 追问升级

追问 1:Fork/Join 和 Stream parallelStream 的关系?

JDK 8 的 parallelStream() 内部使用 ForkJoinPool.commonPool()

List<String> result = list.parallelStream()
    .map(String::toUpperCase)
    .collect(Collectors.toList());
// 内部使用 ForkJoinPool.commonPool()

追问 2:Fork/Join 的线程数应该怎么配置?

对于 CPU 密集型任务,并行度 = CPU 核心数即可。

ForkJoinPool pool = new ForkJoinPool(
    Runtime.getRuntime().availableProcessors()  // CPU 核心数
);

二、与普通线程池的对比 🟡

2.1 适用场景

场景推荐线程池理由
CPU 密集型分治任务ForkJoinPool工作窃取,减少空闲
普通异步任务ThreadPoolExecutor更通用
I/O 密集型任务ThreadPoolExecutorForkJoinPool 默认不支持 I/O
JDK 8 parallelStreamForkJoinPool.commonPool自动使用

2.2 并行排序示例

// JDK 8 parallelSort(内部使用 Fork/Join)
int[] array = new int[1000000];
Arrays.parallelSort(array);  // ForkJoinPool.commonPool()

// 手动 Fork/Join
ForkJoinPool pool = new ForkJoinPool();
pool.submit(() -> Arrays.parallelSort(array, 0, array.length));
pool.shutdown();

三、生产避坑 🟡

3.1 阈值设置不当

// 错误:阈值太小,导致任务拆分过多,开销大于收益
protected static final int THRESHOLD = 10;

// 正确:经验值,任务足够大时拆分才有意义
protected static final int THRESHOLD = 1000;

3.2 递归深度过大

递归深度过大可能导致 StackOverflowError。使用 while 循环 + LinkedTransferQueue 的手动拆分模式更适合深度递归。

四、相关类 🟢

4.1 ForkJoinTask vs RecursiveTask

ForkJoinTask<V>RecursiveTask<V>RecursiveAction 的父类。ForkJoinTask 是抽象的,你通常继承 RecursiveTaskRecursiveAction

4.2 CountedCompleter

JDK 8 引入了 CountedCompleter,用于需要多个子任务完成后执行汇总操作的场景:

class TreeNodeProcessor extends CountedCompleter<Void> {
>    @Override
>    public void compute() {
>        // 处理节点
>        for (TreeNode child : children) {
>            addToPendingCount(1);  // 增加待完成计数
>            new TreeNodeProcessor(child, this).fork();
>        }
>        tryComplete();  // 完成后触发 onCompletion
>    }
>}
💡

面试加分点:能说出"JDK 8 的 CompletableFuture 内部使用 ForkJoinPool.commonPool(),这意味着如果你同时使用了 parallelStream() 和 CompletableFuture,它们会共享同一个线程池,可能导致相互影响",说明他对 JDK 8 并发工具有系统性理解。

⚠️

面试陷阱:被问到"ForkJoinPool 的 asyncMode 是什么意思",很多人会说"不知道"。准确答案是:asyncMode=true 表示任务以 FIFO 顺序异步执行,适合事件驱动的任务流模式;false 表示 LIFO,适合递归分治模式(默认)。