CompletableFuture 异步编程

候选人小冯在面试快手 P7 时,面试官问道:

"CompletableFuture 解决了什么问题?它和 Future 有什么区别?"

小冯说:"CompletableFuture 支持链式调用..."面试官追问:"thenApply 和 thenCompose 有什么区别?"

小冯答不上来。面试官继续:"CompletableFuture 的异常处理怎么做?"

小冯彻底卡住了...

一、核心问题:CompletableFuture 异步编程 🔴

1.1 问题拆解

第一层:问题背景(为什么需要?)
  "CompletableFuture 解决了什么问题?"
  考察点:回调地狱、Future 的局限

第二层:组合方法(怎么链式调用?)
  "thenApply、thenCompose、thenCombine 的区别是什么?"
  考察点:Function vs Function<Future<T>>、并行 vs 串行

第三层:异步执行(怎么实现?)
  "supplyAsync 和 thenApplyAsync 有什么区别?"
  考察点:ForkJoinPool、线程池配置

第四层:异常处理(怎么做?)
  "exceptionally、handle、whenComplete 的区别是什么?"
  考察点:异常恢复 vs 异常传递

1.2 ❌ 错误示范

候选人原话 A:"CompletableFuture 就是异步的 Future,没什么特别的。"

问题诊断:CompletableFuture 解决了 Future 无法组合、无法链式调用、无法处理异常的问题。它是函数式编程在异步场景的应用。

候选人原话 B:"thenApply 和 thenCompose 是一样的,都是转换。"

问题诊断:thenApply 是同步/异步的 Function 转换,thenCompose 是扁平化的转换(用于返回另一个 CompletableFuture 的场景)。

1.3 标准回答

P5 级别:解决的问题

Future 的局限性

Future 只能获取结果,不能链式组合多个 Future:

// Future 的地狱回调
Future<String> f1 = executor.submit(() -> fetchA());
// 无法优雅地组合 f1 和 f2
while (!f1.isDone()) { /* 轮询 */ }
String a = f1.get();
// → 回调地狱

CompletableFuture 的解决方案

CompletableFuture.supplyAsync(() -> fetchA())
    .thenApply(a -> a.toUpperCase())
    .thenCombine(CompletableFuture.supplyAsync(() -> fetchB()),
                 (a, b) -> a + b)
    .thenAccept(result -> System.out.println(result))
    .exceptionally(ex -> { /* 异常处理 */ });

链式调用,代码清晰。

P6 级别:组合方法详解

thenApply vs thenCompose

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "hello");

// thenApply:同步/异步 Function 转换
CompletableFuture<String> r1 = f.thenApply(s -> s + " world");
// 返回 CompletableFuture<String>

// thenCompose:扁平化,用于返回另一个 CompletableFuture
CompletableFuture<String> r2 = f.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " world"));
// 如果不用 thenCompose,结果会是 CompletableFuture<CompletableFuture<String>>

thenCombine vs thenAcceptBoth

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");

// thenCombine:组合两个 Future,返回一个新结果
CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + b);

// thenAcceptBoth:组合两个 Future,不返回结果
f1.thenAcceptBoth(f2, (a, b) -> System.out.println(a + b));

thenApply vs thenApplyAsync

// thenApply:在上一个阶段同一个线程执行
f.thenApply(x -> transform(x));  // 在 supplyAsync 的线程中执行

// thenApplyAsync:在不同的线程执行
f.thenApplyAsync(x -> transform(x));  // 在 ForkJoinPool.commonPool() 中执行

// 指定线程池
f.thenApplyAsync(x -> transform(x), customExecutor);

P7 级别:异常处理与线程池

exceptionally vs handle

CompletableFuture.supplyAsync(() -> riskyOperation())

// exceptionally:仅处理异常,正常流程直接通过
    .exceptionally(ex -> {
        return fallbackValue;  // 异常时返回替代值
    })

// handle:无论正常还是异常都会调用
    .handle((result, ex) -> {
        if (ex != null) {
            return fallbackValue;  // 异常处理
        }
        return result;  // 正常结果
    })

// whenComplete:类似 handle,但不修改结果
    .whenComplete((result, ex) -> {
        if (ex != null) {
            log.error(ex);
        }
    })

默认线程池

// supplyAsync 不指定线程池时,使用 ForkJoinPool.commonPool()
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "result");
// ForkJoinPool.commonPool() 的线程数 = availableProcessors() - 1

// 指定线程池
ExecutorService custom = Executors.newFixedThreadPool(10);
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "result", custom);

allOf vs anyOf

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");

// allOf:等待所有完成(不返回结果)
CompletableFuture.allOf(f1, f2).join();  // 阻塞直到两个都完成

// anyOf:等待任意一个完成
CompletableFuture.anyOf(f1, f2).join();  // 返回第一个完成的结果

【面试官心理】 这道题我能问到 P7 级别,是因为 CompletableFuture 是 JDK 8 异步编程的核心工具。能说清 thenApply vs thenCompose 差异的候选人说明他理解了函数式编程的范畴论基础(Monad 的 flatMap)。

1.4 追问升级

追问 1:CompletableFuture 的 join() 和 get() 有什么区别?

join()get() 功能相同,但 get() 抛出受检的 ExecutionException,而 join() 抛出未受检的 CompletionException

追问 2:CompletableFuture 怎么实现超时?

CompletableFuture.supplyAsync(() -> doWork())
    .completeOnTimeout(defaultValue, 5, TimeUnit.SECONDS)  // JDK 9+

CompletableFuture.supplyAsync(() -> doWork())
    .orTimeout(5, TimeUnit.SECONDS)  // JDK 9+,超时则异常

二、生产避坑 🟡

2.1 阻塞主线程

// 错误:join() 阻塞主线程
CompletableFuture.supplyAsync(() -> doWork())
    .thenApply(this::transform)
    .join();  // 主线程阻塞等待

2.2 异常被吞掉

// 错误:exceptionally 之后异常就消失了
CompletableFuture.supplyAsync(() -> risky())
    .exceptionally(ex -> null);  // 返回 null,异常丢失

// 正确:在 exceptionally 中记录日志
    .exceptionally(ex -> {
        log.error("Task failed", ex);
        return null;
    });

三、CompletableFuture 的最佳实践 🟡

3.1 合理的线程池配置

ExecutorService executor = new ThreadPoolExecutor(
    0, 100, 60L, TimeUnit.SECONDS,
    new SynchronousQueue<>(),
    new ThreadFactoryBuilder().setNameFormat("Async-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 所有异步任务使用同一个线程池
CompletableFuture.supplyAsync(() -> doWork(), executor)
    .thenApplyAsync(x -> transform(x), executor)
    .join();

3.2 取消任务

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // 定期检查中断标志
    }
    return "partial";
});

f.cancel(false);  // 不中断,只是标记为 CANCELLED
💡

面试加分点:能说出"CompletableFuture 的 thenApply 是同步还是异步取决于调用的方法——thenApply 是同步的,thenApplyAsync 是异步的",说明他理解了 JDK 8 异步调度的细节。

⚠️

面试陷阱:被问到"CompletableFuture.allOf 返回什么",很多人会说"返回所有结果"。准确答案是:返回 CompletableFuture<Void>,只有当所有子 CompletableFuture 都完成时才完成,结果本身不可用。