ConcurrentLinkedQueue 原理

面试官问:"ConcurrentLinkedQueue 是怎么实现线程安全的?"

候选人小何答:"用 CAS。"

面试官追问:"具体是怎么实现的?"

小何答不上来。

【面试官心理】 ConcurrentLinkedQueue 的无锁实现是并发队列的经典设计。能说出 CAS + 自旋 + HOPS 优化的候选人,说明对并发编程有深入理解。

一、无锁队列实现 🔴

1.1 核心数据结构

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, Serializable {

    // 头尾节点
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;

    // Node 节点
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }
}

1.2 CAS 操作

// JDK 中的 CAS 操作
// CAS:Compare-And-Swap,原子操作
// 成功条件:当前值 == 期望值
// 如果成功:更新为新值
// 如果失败:重试

// Java 中的 CAS:
// Unsafe.compareAndSwapObject()
// AtomicInteger.compareAndSet()
// ConcurrentLinkedQueue 内部使用 CAS

二、offer 操作 🟡

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();

    Node<E> n = new Node<>(e);
    for (Node<E> t = tail, p = t; ; ) { // 自旋
        Node<E> q = p.next;

        if (q == null) {
            // q 是尾节点,尝试 CAS 添加
            if (p.casNext(null, n)) {
                // 添加成功
                if (p != t) // p 不是尾节点,更新 tail
                    casTail(t, n);
                return true;
            }
        } else if (p == q) {
            // p 已经过时(其他线程修改了)
            // 从 head 重新开始
            p = (t != (t = tail)) ? t : head;
        } else {
            // 推进指针
            if ((p != t && t != (t = tail)) || p.next != q)
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
}

2.1 HOPS 优化

// HOPS:tail 和 head 不是每次都更新
// 只有当 tail 距离实际尾节点超过一定距离时才更新

// tail 优化:
// 不每次 offer 都更新 tail
// 而是每 N 次(大约一次)才更新
// 减少 CAS 操作,提高性能

// 原因:
// 如果每次都更新 tail,offer 和 poll 的竞争会很激烈
// 用 HOPS 可以减少竞争

三、poll 操作 🟡

public E poll() {
    restart: for (Node<E> h = head, p = h, q = p.next; ; ) {
        E item = p.item;

        if (item != null && p.casItem(item, null)) {
            // 成功获取 item
            if (p != h)
                updateHead(h, q); // 更新 head
            return item;
        }

        if (q == null) {
            // 队列为空
            updateHead(h, p);
            return null;
        }

        if (p == q) // p 已过时
            continue restart;

        p = q;
        q = p.next;
    }
}

private final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        // 将旧 head 的 item 设为 null,帮助 GC
        lazySetNext(h, null);
}

四、与 BlockingQueue 对比 🟡

维度ConcurrentLinkedQueueBlockingQueue
队列满时返回 false 或抛异常阻塞等待
队列空时返回 null 或抛异常阻塞等待
实现机制CAS 无锁Lock + Condition
适用场景高并发、低延迟生产者-消费者模型
APIoffer/pollput/take
// ConcurrentLinkedQueue:非阻塞
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item");
String item = queue.poll();

// BlockingQueue:阻塞
BlockingQueue<String> bqueue = new LinkedBlockingQueue<>();
bqueue.put("item");    // 队列满时阻塞
String item = bqueue.take(); // 队列空时阻塞

五、追问升级

面试官:"ConcurrentLinkedQueue 的 size() 是精确的吗?"

// 不是精确的!
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        if (p.item != null)
            ++count;
    }
    return count;
}

// 因为遍历过程中,元素可能正在被添加或移除
// concurrent 包中大多数集合的 size() 都不是精确的

// 如果需要精确计数:自己维护一个 AtomicInteger/LongAdder