CompletableFuture 异步编程

在项目中处理异步任务时,很多同学会用 Future,但被问到"为什么还需要 CompletableFuture"和"thenCompose 和 thenCombine 有什么区别"时,往往说不清楚。我自己也踩过坑——异步任务出错了但没有处理,导致线上问题排查困难。

今天我们就来把这个异步编程工具彻底讲清楚。

一、Future 的局限性

1.1 Future 是什么

// 创建 Future
Future<String> future = executor.submit(() -> {
    // 异步任务
    return "result";
});

// 获取结果(阻塞)
String result = future.get();  // 阻塞等待

// 检查状态
boolean isDone = future.isDone();
boolean isCancelled = future.isCancelled();

1.2 Future 的问题

问题1:无法链式调用

// 错误:Future 不能链式调用
Future<String> future1 = executor.submit(() -> fetchUser());
Future<String> future2 = executor.submit(() -> fetchOrders(future1.get()));  // 阻塞!

// 必须嵌套等待
String userId = future1.get();  // 阻塞
String orders = future2.get();  // 再次阻塞

问题2:无法组合多个 Future

// 错误:无法等待多个 Future 完成
Future<String> f1 = fetchFromService1();
Future<String> f2 = fetchFromService2();

// 需要自己实现等待
while (!f1.isDone() || !f2.isDone()) {
    Thread.sleep(100);
}
String r1 = f1.get();
String r2 = f2.get();

问题3:无法处理异常

// 错误:Future 无法链式处理异常
Future<String> future = executor.submit(() -> {
    throw new RuntimeException("error");
});
// 异常只能通过 get() 获取,链式处理困难

1.3 CompletableFuture 的优势

// CompletableFuture 可以链式调用
CompletableFuture.supplyAsync(() -> fetchUser())
    .thenApply(user -> fetchOrders(user))  // 链式
    .thenAccept(orders -> process(orders))  // 消费
    .exceptionally(ex -> handleError(ex));  // 异常处理

二、创建 CompletableFuture

2.1 基本创建

// 方式1:supplyAsync(有返回值)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "result";
});

// 方式2:runAsync(无返回值)
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("异步任务");
});

// 指定线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(
    () -> fetchData(), pool
);

2.2 同步执行

// 使用 thenApply(同步转换)
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenApply(s -> s.toUpperCase());  // "HELLO"

// 使用 thenAccept(同步消费)
CompletableFuture
    .supplyAsync(() -> "hello")
    .thenAccept(s -> System.out.println(s));  // 立即打印

三、链式方法

3.1 thenApply - 转换

// thenApply:转换结果
CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenApply(s -> s.length());  // Integer: 5

3.2 thenCompose - 扁平化

// 场景:获取用户后,再获取用户订单
// thenApply 返回 CompletableFuture<CompletableFuture<Order>>
CompletableFuture<CompletableFuture<Order>> bad = CompletableFuture
    .supplyAsync(() -> fetchUser())
    .thenApply(user -> fetchOrders(user));

// thenCompose 返回 CompletableFuture<Order>
CompletableFuture<Order> good = CompletableFuture
    .supplyAsync(() -> fetchUser())
    .thenCompose(user -> fetchOrders(user));

3.3 thenCombine - 组合

// 场景:并行获取用户信息和订单信息,然后合并
CompletableFuture<String> userFuture = CompletableFuture
    .supplyAsync(() -> fetchUser());

CompletableFuture<String> orderFuture = CompletableFuture
    .supplyAsync(() -> fetchOrders());

CompletableFuture<String> combined = userFuture
    .thenCombine(orderFuture, (user, orders) -> 
        "User: " + user + ", Orders: " + orders
    );

3.4 thenAccept - 消费

// thenAccept:消费结果,无返回值
CompletableFuture
    .supplyAsync(() -> "hello")
    .thenAccept(s -> System.out.println("Result: " + s));
// 输出:Result: hello

3.5 方法对比

方法输入输出典型场景
thenApplyTU同步转换
thenComposeTCompletableFuture<U>异步链式
thenCombineT, UV组合两个结果
thenAcceptTvoid消费结果

四、异步方法

4.1 async 变体

// thenApply 是同步的,在上一个任务的线程执行
CompletableFuture
    .supplyAsync(() -> "hello")
    .thenApply(s -> s.toUpperCase());  // 同步

// thenApplyAsync 是异步的,在新线程执行
CompletableFuture
    .supplyAsync(() -> "hello")
    .thenApplyAsync(s -> s.toUpperCase());  // 异步

4.2 *Async 方法

// thenApply:继承上一个线程
.thenApply(Function)           // 继承上一个线程

// thenApplyAsync:使用默认 ForkJoinPool
.thenApplyAsync(Function)       // ForkJoinPool.commonPool()

// thenApplyAsync:使用自定义线程池
.thenApplyAsync(Function, executor)  // 指定线程池

4.3 线程池选择

ExecutorService pool = Executors.newFixedThreadPool(10);

CompletableFuture
    .supplyAsync(() -> step1(), pool)  // pool
    .thenApplyAsync(s -> step2())     // 默认线程池!
    .thenApplyAsync(s -> step3(), pool) // pool
    .join();

五、异常处理

5.1 exceptionally

// exceptionally:处理异常
CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("error");
        return "success";
    })
    .exceptionally(ex -> {
        System.out.println("Error: " + ex.getMessage());
        return "default";  // 返回默认值
    })
    .thenAccept(result -> System.out.println("Result: " + result));
// 输出:Error: error
//       Result: default

5.2 handle

// handle:无论成功失败都会调用
CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("error");
        return "success";
    })
    .handle((result, ex) -> {
        if (ex != null) {
            return "handled: " + ex.getMessage();
        }
        return "result: " + result;
    })
    .thenAccept(s -> System.out.println(s));
// 输出:handled: error

5.3 whenComplete

// whenComplete:完成后执行,不修改结果
CompletableFuture
    .supplyAsync(() -> "hello")
    .whenComplete((result, ex) -> {
        if (ex != null) {
            System.out.println("Error: " + ex.getMessage());
        } else {
            System.out.println("Result: " + result);
        }
    })
    .join();

六、多任务组合

6.1 allOf - 等待所有

// 等待所有 CompletableFuture 完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");

CompletableFuture.allOf(f1, f2, f3).join();

// 获取结果
String r1 = f1.join();
String r2 = f2.join();
String r3 = f3.join();

6.2 anyOf - 任意一个

// 任意一个完成即可
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);
    return "A";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "C");

String first = CompletableFuture.anyOf(f1, f2, f3).join();
// 输出:B(因为 B 最快)

6.3 实际应用

// 并行查询多个接口,然后聚合
public CompletableFuture<AggregatedResult> queryAll(String id) {
    CompletableFuture<user> userFuture = CompletableFuture
        .supplyAsync(() -> queryUser(id));
    
    CompletableFuture<List<Order>> ordersFuture = CompletableFuture
        .supplyAsync(() -> queryOrders(id));
    
    CompletableFuture<Balance> balanceFuture = CompletableFuture
        .supplyAsync(() -> queryBalance(id));
    
    return CompletableFuture.allOf(userFuture, ordersFuture, balanceFuture)
        .thenApply(v -> new AggregatedResult(
            userFuture.join(),
            ordersFuture.join(),
            balanceFuture.join()
        ));
}

七、与并行流的对比

7.1 并行流

// 使用并行流
List<String> results = list.parallelStream()
    .map(this::process)
    .collect(Collectors.toList());

7.2 CompletableFuture 优势

特性并行流CompletableFuture
线程池ForkJoinPool自定义线程池
异常处理有限完整(exceptionally)
组合能力强(thenCompose)
返回值只能一种类型多种类型
等待方式只能全部等待支持 anyOf

7.3 选型建议

场景推荐
简单映射转换并行流
需要异常处理CompletableFuture
需要组合多个异步任务CompletableFuture
需要灵活控制线程池CompletableFuture
数据处理流水线并行流

八、常见问题与避坑

8.1 忘记 join

// 错误:没有获取结果
CompletableFuture.supplyAsync(() -> "hello")
    .thenApply(s -> s.toUpperCase());
// 任务可能还没执行完就返回了

// 正确:join 等待完成
CompletableFuture.supplyAsync(() -> "hello")
    .thenApply(s -> s.toUpperCase())
    .join();

8.2 不处理异常

// 错误:不处理异常,任务失败静默
CompletableFuture
    .supplyAsync(() -> fetchData())
    .thenApply(data -> process(data));
// 如果 fetchData 失败,没有日志

// 正确:处理异常
CompletableFuture
    .supplyAsync(() -> fetchData())
    .exceptionally(ex -> {
        log.error("Fetch data failed", ex);
        return null;
    })
    .thenAccept(data -> process(data));

8.3 阻塞主线程

// 错误:在异步任务中调用 get()/join()
CompletableFuture
    .supplyAsync(() -> {
        CompletableFuture<String> inner = fetchAsync();
        return inner.get();  // 阻塞!
    });

// 正确:使用 thenCompose 扁平化
CompletableFuture
    .supplyAsync(() -> fetchAsync())
    .thenCompose(result -> processAsync(result));

【学习小结】

本篇文章的核心要点:

  1. Future 的局限:无法链式调用、无法组合、无法处理异常
  2. CompletableFuture:支持链式编程、组合、异常处理的异步工具
  3. thenApply:同步转换,输入 T 返回 U
  4. thenCompose:异步扁平化,输入 T 返回 CompletableFuture<U>
  5. thenCombine:组合两个异步任务的结果
  6. 异常处理:exceptionally、handle、whenComplete 三种方式
  7. allOf/anyOf:等待所有或任意一个完成
  8. 线程池选择:*Async 方法可指定线程池,thenApply 继承上一个线程