CyclicBarrier 原理

候选人小魏在面试字节 P6 时,面试官问道:

"CyclicBarrier 是什么?它和 CountDownLatch 有什么区别?"

小魏说:"CyclicBarrier 可以循环使用..."面试官追问:"它是怎么实现循环的?generation 字段是什么?"

小魏答不上来。面试官继续:"如果某个线程在等待期间被中断,barrier 会怎样?"

小魏彻底卡住了...

一、核心问题:CyclicBarrier 原理 🔴

1.1 问题拆解

第一层:概念与场景(是什么?)
  "CyclicBarrier 的典型使用场景是什么?"
  考察点:多线程并行后汇总、互相等待

第二层:实现机制(怎么做到?)
  "CyclicBarrier 是怎么实现可循环的?generation 字段的作用?"
  考察点:ReentrantLock + Condition、generation 递增

第三层:Broken Barrier(异常情况怎么处理?)
  "如果某个线程在 await() 期间中断,会发生什么?"
  考察点:broken barrier、BrokenBarrierException

第四层:与 CountDownLatch 的选择(怎么选?)
  "什么场景用 CyclicBarrier?什么场景用 CountDownLatch?"
  考察点:工程决策、场景匹配

1.2 ❌ 错误示范

候选人原话 A:"CyclicBarrier 和 CountDownLatch 都是用来等待线程的,没什么区别。"

问题诊断:两者有本质区别。CyclicBarrier 是多方互相等待(线程之间互相等待),CountDownLatch 是一方等待另一方(主线程等待子任务)。

候选人原话 B:"CyclicBarrier 可以无限循环使用。"

问题诊断:CyclicBarrier 在所有 parties 到达后自动重置,可以复用。但如果有线程因异常离开,barrier 会进入 broken 状态,不能继续使用。

1.3 标准回答

P5 级别:使用场景

典型场景:多线程并行处理后汇总

CyclicBarrier barrier = new CyclicBarrier(N, () -> {
    // barrierAction: 所有 N 个线程到达后执行一次
    System.out.println("所有线程已到达,开始汇总...");
    aggregate();
});

for (int i = 0; i < N; i++) {
    final int id = i;
    new Thread(() -> {
        process(id);          // 并行处理
        barrier.await();       // 等待其他线程
        continueWithResult();  // 所有线程到达后继续
    }).start();
}

常见使用场景

  • 数据并行计算:多线程分别处理数据分片,所有分片完成后汇总
  • 游戏加载:等待所有资源加载完成后进入游戏主界面
  • 阶段化任务:多线程完成阶段一,然后同时进入阶段二

P6 级别:ReentrantLock + Condition 实现

核心实现

CyclicBarrier 使用 ReentrantLock 和 Condition,不依赖 AQS 的 CLH 队列(而是自己维护等待线程):

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();  // 条件队列
    private final int parties;           // 参与方数量
    private final Runnable barrierCommand;  // 汇合后执行的动作
    private int count;                   // 剩余等待数量
    private Generation generation;        // 代次:控制循环重置

    private static class Generation {
        boolean broken = false;
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
        this.generation = new Generation();
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        lock.lock();
        try {
            Generation g = generation;

            // 检查 barrier 是否损坏
            if (g.broken) {
                throw new BrokenBarrierException();
            }

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // 最后一个到达
                Runnable command = barrierCommand;
                if (command != null) {
                    command.run();  // 执行 barrierAction
                }
                nextGeneration();   // 重置 barrier
                return 0;
            }

            // 不是最后一个:等待
            for (;;) {
                try {
                    trip.await();  // 等待其他线程
                    break;
                } catch (InterruptedException e) {
                    if (generation == g && !g.broken) {
                        breakBarrier();
                        throw e;
                    }
                }
            }

            // 被唤醒后检查是否损坏
            if (g.broken) {
                throw new BrokenBarrierException();
            }

            return index;
        } finally {
            lock.unlock();
        }
    }

    // 重置 barrier,开始新一代
    private void nextGeneration() {
        trip.signalAll();  // 唤醒所有等待线程
        count = parties;   // 重置计数器
        generation = new Generation();  // 新代次
    }

    // 标记 barrier 为损坏状态
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
}

P7 级别:generation 机制与 broken 状态

generation 的核心作用

generation 是 CyclicBarrier 循环机制的关键。当所有 parties 到达时,nextGeneration() 创建一个新的 Generation 对象,旧 Generation 失效。这使得:

  1. 等待线程能区分"当前轮的唤醒"和"之前轮次的等待"
  2. Broken Barrier 的检测依赖于 generation 对象引用比较
// 线程被唤醒后的检查
if (generation != g) {  // generation 已更新 → 当前轮的唤醒
    return index;
}
if (g.broken) {         // barrier 已损坏
    throw new BrokenBarrierException();
}

Broken Barrier 的触发条件

  1. 等待中的线程被 interrupt() 中断
  2. 调用 reset() 方法
  3. 超时等待超时

当 barrier 进入 broken 状态后,所有后续的 await() 调用立即抛出 BrokenBarrierException

为什么需要 generation 而不是简单的计数器重置?

考虑这个竞态条件:

T1: index=0, 执行 nextGeneration(), count=parties, generation=gen2
T2: 在 signalAll() 之前执行 await()
T3: signalAll() 唤醒 T2

如果没有 generation,T2 可能在 barrier 重置完成前被 signal(看到 count=parties),导致 T2 以为自己完成了这一轮。如果有 generation 比较,T2 会发现 generation 已变化,但 broken=false,允许它继续进入下一轮。

实际上更微妙:signalAll() 和 count 重置之间有一个时间窗口,generation 的对象引用比较确保了这个窗口不会导致错误行为。

【面试官心理】 这道题我能问到 P7 级别,是因为 generation 机制是 CyclicBarrier 循环特性的核心设计。能解释 generation 对象引用的作用而非简单说"计数器重置"的候选人说明他真正理解了这个设计。能说清 broken barrier 的触发条件和处理方式的候选人说明他有错误处理意识。

1.4 追问升级

追问 1:reset() 和自然汇合重置有什么区别?

  • 自然汇合重置(所有 parties 到达):nextGeneration() 唤醒所有等待线程,每个线程正常返回
  • 手动 reset():立即进入 broken 状态,唤醒所有等待线程并抛出 BrokenBarrierException

reset() 通常用于错误恢复。

追问 2:CyclicBarrier 的并发性能如何?

CyclicBarrier 使用单个 ReentrantLock,所有操作(await, signalAll)都需要获取锁。在高竞争场景下,所有线程同时等待同一个锁,可能成为瓶颈。

JDK 7 的 Phaser 在这方面更好——它支持更细粒度的同步模式。

二、与 CountDownLatch 的详细对比 🟡

2.1 决策树

需要等待线程完成吗?
├── 一方等另一方(N 个子任务完成后主线程继续)
│   └── CountDownLatch
└── 多方互相等待(N 个线程互相等待,到齐后同时继续)
    └── CyclicBarrier

2.2 使用对比

// 场景 1:主线程等待子任务(CountDownLatch)
// 启动 10 个爬虫,主线程等待全部完成
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        crawl();
        latch.countDown();
    }).start();
}
latch.await();  // 主线程等待

// 场景 2:多线程互相等待(CyclicBarrier)
// 10 个工人分别处理数据,完成后同时进入下一阶段
CyclicBarrier barrier = new CyclicBarrier(10, () -> {
    System.out.println("所有工人完成阶段一,进入阶段二");
});
for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        processPhaseOne();
        barrier.await();  // 互相等待
        processPhaseTwo();  // 完成后同时进入阶段二
    }).start();
}

三、生产避坑

3.1 线程异常离开导致 barrier 永久等待

CyclicBarrier barrier = new CyclicBarrier(3);

// 线程 1
try {
>    doWork();
>    barrier.await();  // 正常等待
>} catch (Exception e) {
    barrier.reset();  // 错误处理:reset 会让其他线程抛 BrokenBarrierException
}

// 如果线程 1 抛出异常但未 catch
// 线程 2 和 3 会永远等待(barrier 进入 broken 状态)

解决方案:使用 await(timeout, unit) 并在超时后处理:

int awaitCount = 0;
try {
    awaitCount = barrier.await(10, TimeUnit.SECONDS);
} catch (BrokenBarrierException e) {
    barrier.reset();
} catch (TimeoutException e) {
    barrier.reset();
    // 处理超时
}

3.2 共享 CyclicBarrier 实例的线程安全

CyclicBarrier 内部使用 ReentrantLock 保证线程安全,但 barrierCommand(汇合后执行的动作)需要自己保证线程安全。

💡

面试加分点:能说出"JDK 7 的 Phaser 是更灵活的屏障,可以动态注册参与方数量、多个同步点,且性能比 CyclicBarrier 更好(分段的 Phaser 树结构)",说明他对 JDK 7+ 的并发工具有跟进。

⚠️

面试陷阱:被问到"CyclicBarrier 的parties 和 count 有什么区别",很多人会说"一样的"。实际上,parties 是构造时传入的固定值,count 是动态递减的计数器。当 count 归零并重置后,count 恢复为 parties。