#ConcurrentLinkedQueue 原理
面试官问:"ConcurrentLinkedQueue 是怎么实现线程安全的?"
候选人小何答:"用 CAS。"
面试官追问:"具体是怎么实现的?"
小何答不上来。
【面试官心理】 ConcurrentLinkedQueue 的无锁实现是并发队列的经典设计。能说出 CAS + 自旋 + HOPS 优化的候选人,说明对并发编程有深入理解。
#一、无锁队列实现 🔴
#1.1 核心数据结构
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, Serializable {
// 头尾节点
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
// Node 节点
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
}#1.2 CAS 操作
// JDK 中的 CAS 操作
// CAS:Compare-And-Swap,原子操作
// 成功条件:当前值 == 期望值
// 如果成功:更新为新值
// 如果失败:重试
// Java 中的 CAS:
// Unsafe.compareAndSwapObject()
// AtomicInteger.compareAndSet()
// ConcurrentLinkedQueue 内部使用 CAS#二、offer 操作 🟡
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<>(e);
for (Node<E> t = tail, p = t; ; ) { // 自旋
Node<E> q = p.next;
if (q == null) {
// q 是尾节点,尝试 CAS 添加
if (p.casNext(null, n)) {
// 添加成功
if (p != t) // p 不是尾节点,更新 tail
casTail(t, n);
return true;
}
} else if (p == q) {
// p 已经过时(其他线程修改了)
// 从 head 重新开始
p = (t != (t = tail)) ? t : head;
} else {
// 推进指针
if ((p != t && t != (t = tail)) || p.next != q)
p = (p != t && t != (t = tail)) ? t : q;
}
}
}#2.1 HOPS 优化
// HOPS:tail 和 head 不是每次都更新
// 只有当 tail 距离实际尾节点超过一定距离时才更新
// tail 优化:
// 不每次 offer 都更新 tail
// 而是每 N 次(大约一次)才更新
// 减少 CAS 操作,提高性能
// 原因:
// 如果每次都更新 tail,offer 和 poll 的竞争会很激烈
// 用 HOPS 可以减少竞争#三、poll 操作 🟡
public E poll() {
restart: for (Node<E> h = head, p = h, q = p.next; ; ) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 成功获取 item
if (p != h)
updateHead(h, q); // 更新 head
return item;
}
if (q == null) {
// 队列为空
updateHead(h, p);
return null;
}
if (p == q) // p 已过时
continue restart;
p = q;
q = p.next;
}
}
private final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 将旧 head 的 item 设为 null,帮助 GC
lazySetNext(h, null);
}#四、与 BlockingQueue 对比 🟡
| 维度 | ConcurrentLinkedQueue | BlockingQueue |
|---|---|---|
| 队列满时 | 返回 false 或抛异常 | 阻塞等待 |
| 队列空时 | 返回 null 或抛异常 | 阻塞等待 |
| 实现机制 | CAS 无锁 | Lock + Condition |
| 适用场景 | 高并发、低延迟 | 生产者-消费者模型 |
| API | offer/poll | put/take |
// ConcurrentLinkedQueue:非阻塞
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();
// BlockingQueue:阻塞
BlockingQueue<String> bqueue = new LinkedBlockingQueue<>();
bqueue.put("item"); // 队列满时阻塞
String item = bqueue.take(); // 队列空时阻塞#五、追问升级
面试官:"ConcurrentLinkedQueue 的 size() 是精确的吗?"
// 不是精确的!
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) {
if (p.item != null)
++count;
}
return count;
}
// 因为遍历过程中,元素可能正在被添加或移除
// concurrent 包中大多数集合的 size() 都不是精确的
// 如果需要精确计数:自己维护一个 AtomicInteger/LongAdder