ConcurrentHashMap put 流程

面试官问:"ConcurrentHashMap 的 put 方法流程是什么?"

候选人小童答:"先定位桶,然后加锁写入。"

面试官追问:"JDK 8 用的什么锁?"

小童说:"synchronized?"

面试官追问:"为什么用 synchronized 而不是 ReentrantLock?"

小童答不上来。

【面试官心理】 这道题考查的是候选人对 ConcurrentHashMap JDK 8 实现的理解深度。能说出 synchronized 的优化理由(偏向锁、轻量级锁、JIT 优化)的候选人,说明对并发编程有深入了解。

一、put 完整流程 🔴

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();

    // 1. 计算 hash(扰动)
    int hash = spread(key.hashCode());

    // 2. binCount:用于判断是否需要树化
    int binCount = 0;

    // 3. 自旋 + CAS
    for (Node<K, V>[] tab = table; ; ) {
        Node<K, V> f;
        int n, i, fh;

        // 3.1 第一次 put:初始化 table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 3.2 CAS 检查桶是否为空
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 空桶:CAS 尝试插入
            if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
                break;
        }

        // 3.3 正在扩容:协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);

        // 3.4 桶不为空:synchronized 加锁
        else {
            V oldVal = null;

            // 🔒 锁住当前桶的头节点
            synchronized (f) {
                // 双重检查:确保头节点没变
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        // 链表处理
                        binCount = 1;
                        for (Node<K, V> e = f; ; ++binCount) {
                            if (e.hash == hash &&
                                ((k = e.key) == key || key.equals(k))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K, V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K, V>(hash, key, value, null);
                                break;
                            }
                        }
                    } else if (f instanceof TreeBin) {
                        // 红黑树处理
                        Node<K, V> p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value);
                        oldVal = p.val;
                    }
                }
            }

            if (binCount != 0) {
                // 3.5 判断是否需要树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }

    // 4. 计数 + 检查扩容
    addCount(1L, binCount);
    return null;
}

二、为什么用 synchronized 🟡

// JDK 7 使用 ReentrantLock
Segment<K, V> s = segments[segmentIndex];
s.lock();

// JDK 8 改用 synchronized
synchronized (f) { // 锁住当前桶
    // ...
}

// 改进原因:
// 1. JDK 6+ 对 synchronized 做了大量优化
//    - 偏向锁(无竞争时几乎零开销)
//    - 轻量级锁(轻度竞争时自旋)
//    - 适应性自旋(根据历史统计调整)
// 2. synchronized 不需要手动释放锁
// 3. JVM 内置,更好的集成
// 4. 锁粒度更细(桶级别,而非段级别)

三、CAS + synchronized 的配合 🟡

// 读操作:完全无锁
public V get(Object key) {
    Node<K, V>[] tab;
    Node<K, V> e, p;
    int n, eh;
    K ek;

    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & (h = spread(key.hashCode())))) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        } else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            // 遍历链表
        }
    }
    return null;
}

// 写操作:CAS 尝试 + synchronized 兜底
// 1. 先用 CAS 尝试(无锁,乐观)
// 2. CAS 失败(竞争激烈),用 synchronized 加锁

四、扩容机制 🟡

// JDK 8 支持多线程并发扩容
// helpTransfer:帮助正在扩容的线程

final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
    Node<K, V>[] nextTab;
    int sc;

    if (tab != null && f instanceof ForwardingNode &&
        (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
        // 检查是否需要帮助扩容
        // 每个线程处理一批桶(16个)
        // ...
    }
    return tab;
}

五、追问升级

面试官:"ConcurrentHashMap 的 size() 方法是怎么实现的?"

// JDK 8 之前:累加所有 Segment 的 count
// 每个 Segment 独立计数
// 问题是:需要锁住所有 Segment 才能准确计数

// JDK 8:LongAdder + CAS
private final LongAdder counter = new LongAdder();

public int size() {
    long sum = 0;
    for (LongAdder cell : counter.cells) {
        sum += cell.sum();
    }
    return (sum >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) sum;
}

// put 时更新计数
addCount(1L, binCount);