结构化并发(Structured Concurrency)

Java 21 引入了结构化并发(Structured Concurrency),这是一个让并发编程更安全、更易维护的特性。

我之前写异步代码的时候,经常遇到这样的问题:主任务启动了多个子任务,但主任务取消了,子任务还在后台跑,导致资源泄漏或者奇怪的 bug。

结构化并发就是为了解决这个问题:让子任务的生命周期与作用域绑定

今天我们就来把结构化并发彻底讲透。

一、为什么需要结构化并发

1.1 传统并发的问题

// 传统方式:子任务可能泄漏
Future<String> f1 = executor.submit(() -> callService1());
Future<String> f2 = executor.submit(() -> callService2());
// 主线程取消了
// f1 和 f2 还在后台跑!

问题:

  1. 子任务泄漏:父任务取消后,子任务还在跑
  2. 错误处理复杂:需要手动管理多个 Future
  3. 线程生命周期不明确:不知道线程什么时候该结束

1.2 结构化并发的目标

结构化并发:

try (var scope = new StructuredTaskScope<T>()) {
    Future<A> f1 = scope.fork(task1);
    Future<B> f2 = scope.fork(task2);
    
    scope.join();  // 等待所有任务完成
    
    A a = f1.resultNow();
    B b = f2.resultNow();
}
// scope 关闭时,所有子任务自动取消

特点:

  • 子任务在 scope 内启动
  • scope 关闭时,子任务自动取消
  • 错误自动传播
  • 线程生命周期与代码结构一致

二、StructuredTaskScope

2.1 基本用法

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> callService1());
    Future<Integer> f2 = scope.fork(() -> callService2());
    
    scope.join();  // 等待所有子任务完成
    
    String result1 = f1.resultNow();
    Integer result2 = f2.resultNow();
    
    scope.throwIfFailed();  // 如果有失败,抛出异常
}
// scope 关闭后,子任务会被自动取消

2.2 ShutdownOnFailure:一个失败,全部取消

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> user = scope.fork(() -> fetchUser(id));
    Future<List<Order>> orders = scope.fork(() -> fetchOrders(id));
    
    scope.join();  // 等待所有完成
    
    // 如果任何一个失败,抛出异常
    scope.throwIfFailed();
    
    User u = user.resultNow();
    List<Order> os = orders.resultNow();
    return new UserOrders(u, os);
}
// 如果 fetchUser 失败,fetchOrders 会被自动取消

2.3 ShutdownOnSuccess:一个成功,取消其他

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> searchOnServer1(query));
    scope.fork(() -> searchOnServer2(query));
    scope.fork(() -> searchOnServer3(query));
    
    // 第一个成功返回后,其他任务被取消
    String result = scope.join();  // 返回第一个成功的结果
    return result;
}
// 最快的服务器返回后,其他服务器的任务被取消

三、【直观类比】

【直观类比】

结构化并发就像"团队会议":

传统并发:
  领导派了 10 个任务给 10 个人
  领导不干了,但 10 个人还在各自干
  
结构化并发:
  领导开了个会议室(StructuredTaskScope)
  10 个人在会议室里开会
  领导说散会,所有人立刻停手
  
会议室 = StructuredTaskScope
散会 = scope.close()

四、错误处理

4.1 异常传播

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> {
        throw new RuntimeException("Task 1 failed");
    });
    Future<String> f2 = scope.fork(() -> "Task 2");
    
    scope.join();
    scope.throwIfFailed();  // 抛出第一个异常
}
// 捕获异常
catch (ExecutionException e) {
    System.out.println("Failed: " + e.getCause().getMessage());
}

4.2 多个异常

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    scope.fork(() -> { throw new RuntimeException("Error 1"); });
    scope.fork(() -> { throw new RuntimeException("Error 2"); });
    
    scope.join();
    scope.throwIfFailed();
} catch (ExecutionException e) {
    // e.getCause() 只包含第一个异常
    // 其他异常被 suppressed
    Throwable[] suppressed = e.getCause().getSuppressed();
}

4.3 自定义 Scope

// 扩展 StructuredTaskScope
public class MyScope<T> extends StructuredTaskScope<T> {
    
    @Override
    protected void handleComplete(Future<T> future) {
        // 任务完成时的回调
        if (future.isDone()) {
            // 处理完成的任务
        }
    }
}

五、与虚拟线程结合

5.1 完美配合

结构化并发和虚拟线程是天然搭档:

// 使用虚拟线程 + 结构化并发
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> blockingCall1());
    Future<String> f2 = scope.fork(() -> blockingCall2());
    
    scope.join();
    scope.throwIfFailed();
    
    return f1.resultNow() + f2.resultNow();
}

5.2 为什么虚拟线程需要结构化并发

// 虚拟线程的阻塞不会阻塞 Carrier
// 但子任务的取消需要结构化并发来管理

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // 启动多个虚拟线程任务
    scope.fork(() -> fetchData1());
    scope.fork(() -> fetchData2());
    
    scope.join();
    // 如果主任务失败,scope 会取消所有子任务
}

六、生产最佳实践

6.1 ✅ 正确示例:并行获取多个数据

public UserOrders getUserOrders(Long userId) throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<User> userFuture = scope.fork(() -> userRepository.findById(userId));
        Future<List<Order>> ordersFuture = scope.fork(() -> orderRepository.findByUserId(userId));
        
        scope.join();  // 等待所有查询完成
        scope.throwIfFailed();  // 有失败则抛出
        
        return new UserOrders(
            userFuture.resultNow(),
            ordersFuture.resultNow()
        );
    }
}

6.2 ✅ 正确示例:超时控制

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    scope.fork(() -> callWithTimeout(service1(), Duration.ofSeconds(1)));
    scope.fork(() -> callWithTimeout(service2(), Duration.ofSeconds(1)));
    
    scope.joinUntil(Instant.now().plusSeconds(2));  // 最多等 2 秒
    scope.throwIfFailed();
}

6.3 ❌ 错误示例:在 scope 外使用 fork 的 Future

StructuredTaskScope.ShutdownOnFailure scope = new StructuredTaskScope.ShutdownOnFailure().start();
Future<String> f1 = scope.fork(() -> "task");

// ❌ scope 关闭后,f1 可能已经被取消
scope.close();

// ❌ 在 scope 关闭后访问结果
f1.resultNow();  // 可能抛异常

正确做法:

try (var scope = new StructuredTaskScope.ShutdownOnFailure().start()) {
    Future<String> f1 = scope.fork(() -> "task");
    scope.join();
    String result = f1.resultNow();  // 在 scope 关闭前获取
}

七、面试追问链

第一层:基础概念

面试官问:"结构化并发是什么?"

Java 21 引入的特性,通过 StructuredTaskScope 把子任务的生命周期绑定到代码块。scope 关闭时,子任务自动取消。

第二层:ShutdownOnFailure vs ShutdownOnSuccess

面试官问:"ShutdownOnFailure 和 ShutdownOnSuccess 有什么区别?"

ShutdownOnFailure:一个子任务失败时,取消其他所有子任务。ShutdownOnSuccess:第一个子任务成功时,取消其他子任务。适用场景不同。

第三层:与虚拟线程的关系

面试官问:"结构化并发和虚拟线程有什么关系?"

结构化并发需要虚拟线程来发挥最大效果。虚拟线程的挂起机制让结构化并发的调度更高效。两者结合可以写出既安全又高效的并发代码。

【学习小结】

  • StructuredTaskScope 管理子任务的生命周期
  • ShutdownOnFailure:一个失败,全部取消
  • ShutdownOnSuccess:一个成功,取消其他
  • scope 关闭时自动取消所有子任务
  • 与虚拟线程天然配合
  • 避免子任务泄漏问题