ConcurrentHashMap 扩容机制

面试官问:"ConcurrentHashMap 在扩容的时候还能读写吗?"

候选人小葛说:"应该可以吧..."

面试官追问:"怎么保证读写不受扩容影响的?"

小葛答不上来。

【面试官心理】 ConcurrentHashMap 的并发扩容是 JDK 8 的重要改进。能说出 ForwardingNode 和协助扩容机制的候选人,说明对 JDK 源码有研究。

一、扩容期间读写机制 🔴

1.1 ForwardingNode

// 扩容过程中,已完成的桶会被替换为 ForwardingNode
static final class ForwardingNode<K, V> extends Node<K, V> {
    final Node<K, V>[] nextTable; // 指向新数组

    ForwardingNode(Node<K, V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

    // find 方法重写:在新数组中查找
    Node<K, V> find(int h, Object k) {
        Node<K, V>[] tab = nextTable;
        outer: for (int i = 0; ; ) {
            // 在新数组中查找
            if (tab == null) break;
            // ...
        }
    }
}

1.2 读写操作遇到扩容

// get 操作:遇到 ForwardingNode,去新数组查找
public V get(Object key) {
    Node<K, V>[] tab;
    Node<K, V> e, p;
    // ...
    if ((e = tabAt(tab, i)) != null) {
        if (e.hash >= 0) { // 普通节点
            // 遍历
        } else if (e instanceof ForwardingNode) { // 扩容中!
            // 去新数组查找
            tab = ((ForwardingNode<K, V>) e).nextTable;
            continue; // 继续循环
        }
    }
    return null;
}

// put 操作:遇到 ForwardingNode,帮助扩容
else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f); // 帮助扩容

二、SIZECTL 状态控制 🔴

// SIZECTL:控制并发扩容的核心变量
// 不同的负值表示不同的状态

private transient volatile int sizeCtl;

// 状态含义:
// -1          :正在初始化
// -N (N>1)    :正在扩容,-(1 + 参与扩容的线程数)
// 正数        :下次扩容的阈值(capacity * loadFactor)

三、多线程协助扩容 🟡

final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
    Node<K, V>[] nextTab;
    int sc;

    if (tab != null && f instanceof ForwardingNode &&
        (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {
        // 循环,直到扩容完成
        for (int i = 0; ; ) {
            if ((sc = sizeCtl) < 0 &&
                tab == this.table && nextTab == nextTable &&
                Thread.yield()) // 让出 CPU,让其他线程完成扩容
                continue;

            if ((sc >>> RESIZE_STAMP_SHIFT) + 2 != sc)
                return tab;
        }
    }
    return tab;
}

四、扩容过程 🟡

private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
    int n = tab.length, stride;

    // 每个线程处理的桶数(最少 16)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;

    // 第一个线程:创建新数组
    if (nextTab == null) {
        Node<K, V>[] nt = (Node<K, V>[]) new Node[n << 1];
        nextTab = nt;
        nextTable = nextTab;
        transferIndex = n;
    }

    int nextn = nextTab.length;

    // 遍历当前桶,拆分为两个链表
    for (int i = 0; i < n; i++) {
        Node<K, V> f = tabAt(tab, i);
        if (f != null) {
            int runBit = f.hash & n; // 判断属于哪个新桶
            Node<K, V> lastRun = f;

            // 找到最后一个节点
            for (Node<K, V> e = f.next; e != null; e = e.next) {
                int eb = e.hash & n;
                if (eb != runBit) {
                    runBit = eb;
                    lastRun = e;
                }
            }

            // 拆分到两个新桶
            if ((runBit & n) == 0)
                // 保持原位置
                hiHead = lastRun;
            else
                // 移到新位置
                loHead = lastRun;

            // ...
        }

        // 已完成的桶替换为 ForwardingNode
        setTabAt(nextTab, i, loHead);
        setTabAt(nextTab, i + n, hiHead);
        setTabAt(tab, i, new ForwardingNode<>(nextTab));
    }
}

五、追问升级

面试官:"ConcurrentHashMap 扩容时,旧数组会被 GC 回收吗?"

// 扩容完成后:
// 1. table 引用指向新数组 nextTable
// 2. nextTable = null,释放旧数组引用
// 3. 如果没有其他引用,旧数组可以被 GC 回收

// 这是安全的,因为:
// 1. 所有访问都通过 ForwardingNode 指向新数组
// 2. 旧数组的节点在 transfer 过程中已全部迁移
// 3. 没有引用指向旧数组后,可以被 GC