#CompletableFuture 异步编程
在项目中处理异步任务时,很多同学会用 Future,但被问到"为什么还需要 CompletableFuture"和"thenCompose 和 thenCombine 有什么区别"时,往往说不清楚。我自己也踩过坑——异步任务出错了但没有处理,导致线上问题排查困难。
今天我们就来把这个异步编程工具彻底讲清楚。
#一、Future 的局限性
#1.1 Future 是什么
// 创建 Future
Future<String> future = executor.submit(() -> {
// 异步任务
return "result";
});
// 获取结果(阻塞)
String result = future.get(); // 阻塞等待
// 检查状态
boolean isDone = future.isDone();
boolean isCancelled = future.isCancelled();#1.2 Future 的问题
问题1:无法链式调用
// 错误:Future 不能链式调用
Future<String> future1 = executor.submit(() -> fetchUser());
Future<String> future2 = executor.submit(() -> fetchOrders(future1.get())); // 阻塞!
// 必须嵌套等待
String userId = future1.get(); // 阻塞
String orders = future2.get(); // 再次阻塞问题2:无法组合多个 Future
// 错误:无法等待多个 Future 完成
Future<String> f1 = fetchFromService1();
Future<String> f2 = fetchFromService2();
// 需要自己实现等待
while (!f1.isDone() || !f2.isDone()) {
Thread.sleep(100);
}
String r1 = f1.get();
String r2 = f2.get();问题3:无法处理异常
// 错误:Future 无法链式处理异常
Future<String> future = executor.submit(() -> {
throw new RuntimeException("error");
});
// 异常只能通过 get() 获取,链式处理困难#1.3 CompletableFuture 的优势
// CompletableFuture 可以链式调用
CompletableFuture.supplyAsync(() -> fetchUser())
.thenApply(user -> fetchOrders(user)) // 链式
.thenAccept(orders -> process(orders)) // 消费
.exceptionally(ex -> handleError(ex)); // 异常处理#二、创建 CompletableFuture
#2.1 基本创建
// 方式1:supplyAsync(有返回值)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "result";
});
// 方式2:runAsync(无返回值)
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务");
});
// 指定线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(
() -> fetchData(), pool
);#2.2 同步执行
// 使用 thenApply(同步转换)
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase()); // "HELLO"
// 使用 thenAccept(同步消费)
CompletableFuture
.supplyAsync(() -> "hello")
.thenAccept(s -> System.out.println(s)); // 立即打印#三、链式方法
#3.1 thenApply - 转换
// thenApply:转换结果
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s.length()); // Integer: 5#3.2 thenCompose - 扁平化
// 场景:获取用户后,再获取用户订单
// thenApply 返回 CompletableFuture<CompletableFuture<Order>>
CompletableFuture<CompletableFuture<Order>> bad = CompletableFuture
.supplyAsync(() -> fetchUser())
.thenApply(user -> fetchOrders(user));
// thenCompose 返回 CompletableFuture<Order>
CompletableFuture<Order> good = CompletableFuture
.supplyAsync(() -> fetchUser())
.thenCompose(user -> fetchOrders(user));#3.3 thenCombine - 组合
// 场景:并行获取用户信息和订单信息,然后合并
CompletableFuture<String> userFuture = CompletableFuture
.supplyAsync(() -> fetchUser());
CompletableFuture<String> orderFuture = CompletableFuture
.supplyAsync(() -> fetchOrders());
CompletableFuture<String> combined = userFuture
.thenCombine(orderFuture, (user, orders) ->
"User: " + user + ", Orders: " + orders
);#3.4 thenAccept - 消费
// thenAccept:消费结果,无返回值
CompletableFuture
.supplyAsync(() -> "hello")
.thenAccept(s -> System.out.println("Result: " + s));
// 输出:Result: hello#3.5 方法对比
| 方法 | 输入 | 输出 | 典型场景 |
|---|---|---|---|
thenApply | T | U | 同步转换 |
thenCompose | T | CompletableFuture<U> | 异步链式 |
thenCombine | T, U | V | 组合两个结果 |
thenAccept | T | void | 消费结果 |
#四、异步方法
#4.1 async 变体
// thenApply 是同步的,在上一个任务的线程执行
CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase()); // 同步
// thenApplyAsync 是异步的,在新线程执行
CompletableFuture
.supplyAsync(() -> "hello")
.thenApplyAsync(s -> s.toUpperCase()); // 异步#4.2 *Async 方法
// thenApply:继承上一个线程
.thenApply(Function) // 继承上一个线程
// thenApplyAsync:使用默认 ForkJoinPool
.thenApplyAsync(Function) // ForkJoinPool.commonPool()
// thenApplyAsync:使用自定义线程池
.thenApplyAsync(Function, executor) // 指定线程池#4.3 线程池选择
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture
.supplyAsync(() -> step1(), pool) // pool
.thenApplyAsync(s -> step2()) // 默认线程池!
.thenApplyAsync(s -> step3(), pool) // pool
.join();#五、异常处理
#5.1 exceptionally
// exceptionally:处理异常
CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("error");
return "success";
})
.exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return "default"; // 返回默认值
})
.thenAccept(result -> System.out.println("Result: " + result));
// 输出:Error: error
// Result: default#5.2 handle
// handle:无论成功失败都会调用
CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("error");
return "success";
})
.handle((result, ex) -> {
if (ex != null) {
return "handled: " + ex.getMessage();
}
return "result: " + result;
})
.thenAccept(s -> System.out.println(s));
// 输出:handled: error#5.3 whenComplete
// whenComplete:完成后执行,不修改结果
CompletableFuture
.supplyAsync(() -> "hello")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Error: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
})
.join();#六、多任务组合
#6.1 allOf - 等待所有
// 等待所有 CompletableFuture 完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");
CompletableFuture.allOf(f1, f2, f3).join();
// 获取结果
String r1 = f1.join();
String r2 = f2.join();
String r3 = f3.join();#6.2 anyOf - 任意一个
// 任意一个完成即可
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "A";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");
String first = CompletableFuture.anyOf(f1, f2, f3).join();
// 输出:B(因为 B 最快)#6.3 实际应用
// 并行查询多个接口,然后聚合
public CompletableFuture<AggregatedResult> queryAll(String id) {
CompletableFuture<user> userFuture = CompletableFuture
.supplyAsync(() -> queryUser(id));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> queryOrders(id));
CompletableFuture<Balance> balanceFuture = CompletableFuture
.supplyAsync(() -> queryBalance(id));
return CompletableFuture.allOf(userFuture, ordersFuture, balanceFuture)
.thenApply(v -> new AggregatedResult(
userFuture.join(),
ordersFuture.join(),
balanceFuture.join()
));
}#七、与并行流的对比
#7.1 并行流
// 使用并行流
List<String> results = list.parallelStream()
.map(this::process)
.collect(Collectors.toList());#7.2 CompletableFuture 优势
| 特性 | 并行流 | CompletableFuture |
|---|---|---|
| 线程池 | ForkJoinPool | 自定义线程池 |
| 异常处理 | 有限 | 完整(exceptionally) |
| 组合能力 | 弱 | 强(thenCompose) |
| 返回值 | 只能一种类型 | 多种类型 |
| 等待方式 | 只能全部等待 | 支持 anyOf |
#7.3 选型建议
| 场景 | 推荐 |
|---|---|
| 简单映射转换 | 并行流 |
| 需要异常处理 | CompletableFuture |
| 需要组合多个异步任务 | CompletableFuture |
| 需要灵活控制线程池 | CompletableFuture |
| 数据处理流水线 | 并行流 |
#八、常见问题与避坑
#8.1 忘记 join
// 错误:没有获取结果
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase());
// 任务可能还没执行完就返回了
// 正确:join 等待完成
CompletableFuture.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase())
.join();#8.2 不处理异常
// 错误:不处理异常,任务失败静默
CompletableFuture
.supplyAsync(() -> fetchData())
.thenApply(data -> process(data));
// 如果 fetchData 失败,没有日志
// 正确:处理异常
CompletableFuture
.supplyAsync(() -> fetchData())
.exceptionally(ex -> {
log.error("Fetch data failed", ex);
return null;
})
.thenAccept(data -> process(data));#8.3 阻塞主线程
// 错误:在异步任务中调用 get()/join()
CompletableFuture
.supplyAsync(() -> {
CompletableFuture<String> inner = fetchAsync();
return inner.get(); // 阻塞!
});
// 正确:使用 thenCompose 扁平化
CompletableFuture
.supplyAsync(() -> fetchAsync())
.thenCompose(result -> processAsync(result));#【学习小结】
本篇文章的核心要点:
- Future 的局限:无法链式调用、无法组合、无法处理异常
- CompletableFuture:支持链式编程、组合、异常处理的异步工具
- thenApply:同步转换,输入 T 返回 U
- thenCompose:异步扁平化,输入 T 返回
CompletableFuture<U> - thenCombine:组合两个异步任务的结果
- 异常处理:exceptionally、handle、whenComplete 三种方式
- allOf/anyOf:等待所有或任意一个完成
- 线程池选择:*Async 方法可指定线程池,thenApply 继承上一个线程