ConcurrentHashMap put 流程

面试官问:"ConcurrentHashMap 的 put 方法是怎么实现的?"

候选人小张答:"先用 CAS 插入,失败了再用 synchronized。"

面试官追问:"具体是怎么判断的?CAS 和 synchronized 分别在什么情况下使用?"

小张支支吾吾答不上来。

【面试官心理】 put 流程是理解 ConcurrentHashMap 的核心。能够说清楚 CAS 和 synchronized 分别在什么情况下使用的候选人,说明真正理解了 ConcurrentHashMap 的并发设计。

一、put 方法入口 🔴

1.1 公开方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

public V putIfAbsent(K key, V value) {
    return putVal(key, value, true);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key/value 不能为 null(与 HashMap 不同)
    if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode());
    int binCount = 0;

    for (Node<K, V>[] tab = table; ; ) {
        Node<K, V> f;
        int n, i, fh;

        // 情况 1:table 未初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 情况 2:桶为空,CAS 插入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
                break;
        }

        // 情况 3:正在扩容,协助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);

        // 情况 4:正常插入,需要 synchronized
        else {
            // ... synchronized 块
        }
    }
    addCount(1L, binCount);
    return null;
}

1.2 put 流程图

graph TD
    A["put(key, value)"] --> B["spread(hash)"]
    B --> C{"table == null?"]
    C -->|是| D["initTable() 初始化"]
    C -->|否| E["计算 index"]
    D --> E
    E --> F{"桶为空?"]
    F -->|是| G["CAS 插入"]
    F -->|否| H{"桶是 ForwardingNode?"]
    H -->|是| I["helpTransfer() 协助扩容"]
    H -->|否| J["synchronized 锁桶"]
    I --> E
    J --> K["链表插入或更新"]
    K --> L{"链表长度 >= 8?"]
    L -->|是| M["treeifyBin 树化"]
    L -->|否| N["遍历结束"]
    G --> O["addCount 计数"]
    M --> O
    N --> O
    O --> P["返回"]

二、spread 方法(哈希扰动)🔴

2.1 spread 实现

// ConcurrentHashMap 的哈希计算
static final int spread(int h) {
    // 和 HashMap 类似,但多了一步 & HASH_BITS
    return (h ^ (h >>> 16)) & HASH_BITS;
}

// HASH_BITS = 0x7fffffff
// 目的是把 hash 变成正数,避免负数导致的问题

2.2 为什么需要 spread

// hashCode() 可能返回负数
// 例如 String 的 hashCode 有可能返回负数

// 如果 hash 是负数
// (n - 1) & hash 在某些情况下会出问题

// spread 操作:
// 1. 高位和低位混合(和 HashMap 一样)
// 2. & HASH_BITS 确保结果是正数

三、CAS 插入 🔴

3.1 桶为空时的插入

// 情况 2:桶为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    // f = tab[i],如果为 null,说明桶是空的
    // CAS 尝试插入新节点

    if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
        break;  // 插入成功,退出循环
}

3.2 tabAt 和 casTabAt

// 获取数组元素(volatile 读取)
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
    return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
}

// CAS 设置数组元素
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i,
                                     Node<K, V> c, Node<K, V> v) {
    return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
}
💡

tabAt 使用 volatile 读取,保证看到其他线程的最新写入。casTabAt 使用 CAS,保证插入的原子性。

3.3 CAS 失败重试

// CAS 失败,说明其他线程已经插入了
// for 循环会继续,重新获取 tab[i]
// 可能的情况:
// 1. 其他线程在同一个桶插入了节点
// 2. 其他线程触发了扩容

for (Node<K, V>[] tab = table; ; ) {
    // 重新检查,重新尝试
}

四、synchronized 锁桶 🟡

4.1 为什么需要 synchronized

// 桶不为空时,CAS 插入会失败
// 需要用 synchronized 锁住桶,然后进行操作

else {
    // f 是桶的头节点
    synchronized (f) {  // 只锁住这个桶,不影响其他桶
        // 检查桶是否变化
        if (tabAt(tab, i) == f) {
            // 桶没有变化,继续操作
        }
    }
}

4.2 synchronized 块内部

synchronized (f) {
    if (tabAt(tab, i) == f) {
        // fh 是 f 的 hash
        if (fh >= 0) {  // 普通链表节点
            binCount = 1;
            for (Node<K, V> e = f; ; ++binCount) {
                if (e.hash == hash &&
                    keyEquals(key, e.key)) {
                    // 找到相同的 key,更新 value
                    V oldVal = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                    return oldVal;
                }
                Node<K, V> pred = e;
                if ((e = e.next) == null) {
                    // 遍历到链表尾部,插入新节点
                    pred.next = new Node<K, V>(hash, key, value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {  // 红黑树节点
            // 红黑树插入
            TreeBin<K, V> t = (TreeBin<K, V>) f;
            TreeNode<K, V> r, p;
            if ((r = t.root) != null &&
                (p = r.findTreeNode(hash, key, null)) != null) {
                V oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
                return oldVal;
            }
            else {
                // 插入新节点
                p = new TreeNode<>(hash, key, value, null);
                if ((r = t.root) != null)
                    p.putTreeVal(r, p);
                // ...
            }
        }
    }
}

4.3 ❌ 错误示范

候选人原话:"ConcurrentHashMap 用 CAS 保证线程安全,所以不需要锁。"

问题诊断

  • 忽略了 synchronized 在链表/红黑树操作中的作用
  • 不理解 CAS 在高竞争下的失败重试
  • 不理解为什么需要 synchronized

面试官内心 OS:"这个候选人可能只是看过表面,没有深入理解并发场景的复杂性。"

【面试官心理】 ConcurrentHashMap 的 put 流程中,CAS 用于初始化和简单插入,synchronized 用于链表/红黑树的修改。两者结合,既保证了性能,又保证了线程安全。

五、红黑树插入 🟡

5.1 TreeBin 的特殊处理

else if (f instanceof TreeBin) {
    TreeBin<K, V> t = (TreeBin<K, V>) f;
    TreeNode<K, V> r, p;

    // 在红黑树中查找
    if ((r = t.root) != null &&
        (p = r.findTreeNode(hash, key, null)) != null) {
        // 找到了,更新
        V oldVal = p.val;
        if (!onlyIfAbsent)
            p.val = value;
        return oldVal;
    }
    else {
        // 没找到,插入新节点
        p = new TreeNode<>(hash, key, value, null);
        if ((r = t.root) != null)
            p.putTreeVal(r, p);
        // ...
    }
}

5.2 TreeBin vs TreeNode

// TreeBin 是红黑树的容器,包装了 TreeNode
// TreeBin 内部有自己的锁机制

static final class TreeBin<K, V> extends Node<K, V> {
    TreeNode<K, V> root;
    volatile TreeNode<K, V> first;
    volatile int waiterState;  // 等待者状态
    int lockState;              // 锁状态
    // ...
}

// TreeNode 是红黑树的节点
static final class TreeNode<K, V> extends Node<K, V> {
    TreeNode<K, V> parent;
    TreeNode<K, V> left;
    TreeNode<K, V> right;
    TreeNode<K, V> prev;
    boolean red;
    // ...
}

六、树化阈值检查 🟡

6.1 树化条件

if (binCount != 0) {
    // binCount 是遍历的节点数
    if (binCount >= TREEIFY_THRESHOLD)  // 8
        treeifyBin(tab, i);  // 树化
    if (oldVal != null)
        return oldVal;
    break;
}

6.2 treeifyBin 方法

final void treeifyBin(Node<K, V>[] tab, int index) {
    Node<K, V> b;
    int n, sc;
    if (tab != null) {
        // 如果容量 < 64,优先扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)  // 64
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null &&
                 b.hash >= 0) {  // 链表节点
            // 链表转红黑树
            TreeNode<K, V> hd = null, tl = null;
            do {
                TreeNode<K, V> p = replacementTreeNode(b, null);
                // ... 转换逻辑
            } while ((b = b.next) != null);
            // ...
        }
    }
}

七、计数更新 🟡

7.1 addCount 方法

private final void addCount(long x, int check) {
    CounterCell[] as;
    long b, s;

    // 1. 尝试 CAS 更新 baseCount
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        // 2. CAS 失败,使用 CounterCell
        CounterCell a;
        long v;
        int m;
        boolean uncontended = true;

        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 3. 还是失败,自旋
            fullAddCount(x, uncontended);
            return;
        }
        s = b + x;
    }

    // 4. 检查是否需要扩容
    if (check >= 0) {
        Node<K, V>[] tab, nt;
        int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            if (sc < 0) break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1)) {
                // 扩容
                nt = ((Node<K, V>[]) new Node<?, ?>[n << 1]);
                transfer(tab, nt);
                // ...
                break;
            }
        }
    }
}

7.2 sizeCtl 的作用

// sizeCtl 是控制并发操作的关键变量
// -1:表示正在初始化
// -n:表示正在扩容(-n = -(1 + nThreads))
// 正数:表示扩容阈值

八、协助扩容 🟡

8.1 扩容检测

// 情况 3:检测到 ForwardingNode
else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

// MOVED = -1(ForwardingNode 的 hash)

8.2 helpTransfer 方法

final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
    Node<K, V>[] nextTab;
    int sc;

    if (tab != null && f instanceof ForwardingNode &&
        (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
        // 协助扩容
        while (nextTab == nextTable && table == tab &&
               sc == sizeCtl) {
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1)) {
                transfer(tab, nextTab);
                return nextTab;
            }
        }
    }
    return table;
}
💡

当一个线程发现其他线程正在扩容时,它会协助扩容,而不是等待。这是 ConcurrentHashMap 的一个优化,减少了扩容的停顿时间。

九、并发场景分析 🟡

9.1 线程 A 和线程 B 同时 put 到同一个桶

// 初始状态:桶为空

// 线程 A 和线程 B 同时执行
// 1. tabAt(tab, i) 都读到 null
// 2. 线程 A 先 CAS 成功,桶变成 NodeA
// 3. 线程 B 的 CAS 失败

// 线程 B 继续循环:
// 1. tabAt(tab, i) 读到 NodeA(不为空)
// 2. 进入 synchronized (NodeA) 块
// 3. 在链表尾部插入节点

9.2 线程 A put,线程 B get

// 线程 A 在 synchronized 块中修改链表
synchronized (f) {
    // 修改链表
    e.value = value;  // 更新值
}

// 线程 B 同时 get
public V get(Object key) {
    Node<K, V>[] tab;
    Node<K, V> e, p;
    int n, eh;
    K ek;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & hash)) != null) {
        // tabAt 是 volatile 读取
        // 能看到线程 A 的修改
        if ((ek = e.key) == key || (ek != null && key.equals(ek)))
            return e.value;
    }
    return null;
}

9.3 volatile 保证可见性

// Node.value 是 volatile 的
static class Node<K, V> implements Map.Entry<K, V> {
    volatile V value;  // volatile 保证可见性
    // ...
}

// 所以 get 一定能看到 put 的最新值

十、面试高频追问 🟡

10.1 第一层追问

面试官:"ConcurrentHashMap 的 put 方法什么时候用 CAS,什么时候用 synchronized?"

候选人:...

正确回答

  • CAS:桶为空时尝试插入
  • synchronized:桶不为空时(链表/红黑树的插入、更新、删除)

10.2 第二层追问

面试官:"synchronized 只锁住一个桶,会不会影响其他线程操作其他桶?"

候选人:...

正确回答:不会。synchronized 只锁住一个桶(链表头),其他线程可以同时操作不同的桶,实现真正的并发。

10.3 第三层追问

面试官:"ConcurrentHashMap 的 key/value 能为 null 吗?"

候选人:...

正确回答:不能。如果为 null,put 方法会抛出 NullPointerException。这是和 HashMap 的主要区别之一。

10.4 第四层追问

面试官:"ConcurrentHashMap 的 size() 方法是怎么工作的?"

候选人:...

正确回答

  • 使用 CounterCell 分散计数
  • 多个线程并发更新不同的 CounterCell
  • 累加时遍历所有 CounterCell 求和
  • 不需要加锁,性能好

【学习小结】 ConcurrentHashMap put 流程要点:

  1. spread() 计算哈希(扰动 + 保证正数)
  2. 桶为空 → CAS 插入
  3. 桶为 ForwardingNode → 协助扩容
  4. 桶为普通节点 → synchronized 锁桶,链表/红黑树操作
  5. binCount >= 8 → 树化
  6. addCount() 更新计数,可能触发扩容