ReentrantReadWriteLock读写锁

一个让候选人翻车的面试题

面试官问:"读写锁适用于什么场景?"

候选人小张说:"读多写少的场景。"

面试官追问:"为什么读多写少时读写锁比普通锁性能好?"

小张说:"因为读的时候不用加锁?"

面试官继续问:"那读和写之间怎么保证可见性?"

小张答不上来了。

读写锁是Java并发中的经典设计。很多同学知道"读多写少用读写锁",但对为什么读可以并发读写如何互斥写锁饥饿问题理解不深。

今天这篇文章,把读写锁讲透。

读写锁的基本概念

读写锁的规则

public class ReadWriteLockDemo {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    
    private int value = 0;
    
    // 读操作:可以多个线程同时读
    public int read() {
        readLock.lock();
        try {
            return value;
        } finally {
            readLock.unlock();
        }
    }
    
    // 写操作:独占,同一时间只能有一个线程写
    public void write(int newValue) {
        writeLock.lock();
        try {
            value = newValue;
        } finally {
            writeLock.unlock();
        }
    }
}

读写锁的三条规则

组合是否允许原因
读-读✅ 允许不涉及数据修改,并发安全
读-写❌ 互斥读可能读到脏数据
写-写❌ 互斥写操作需要独占

使用场景

// ✅ 读多写少的缓存
public class CacheDemo<K, V> {
    private final Map<K, V> cache = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    public V get(K key) {
        rwLock.readLock().lock();
        try {
            return cache.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    public void put(K key, V value) {
        rwLock.writeLock().lock();
        try {
            cache.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

// ✅ 配置文件的读写
public class ConfigService {
    private final Map<String, String> config = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    // 多个线程可以同时读取
    public String getConfig(String key) {
        rwLock.readLock().lock();
        try {
            return config.get(key);
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    // 只有一个线程可以写
    public void updateConfig(String key, String value) {
        rwLock.writeLock().lock();
        try {
            config.put(key, value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

ReentrantReadWriteLock的实现

state的位划分

// ReentrantReadWriteLock的state结构
// 高16位:读锁计数(同时持有的读锁数量)
// 低16位:写锁计数(0或1,因为是互斥锁)

static final int SHARED_SHIFT   = 16;        // 16位
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  // 0x10000 = 65536
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  // 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  // 0xFFFF

// 获取读锁计数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

// 获取写锁计数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

state状态示例

无锁状态: state = 0
         高16位 = 0, 低16位 = 0

写锁持有: state = 1
         高16位 = 0, 低16位 = 1

读锁持有(1个线程): state = 65536
         高16位 = 1, 低16位 = 0

读锁持有(多个线程): state = 65536 * N (N个线程各持有1个读锁)
         高16位 = N, 低16位 = 0

写锁重入(同一线程): state = 1 + 65536 (假设有读锁重入)
         高16位 = 1, 低16位 = 1

获取读锁

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    
    // 检查是否有写锁(互斥)
    if (exclusiveCount(c) != 0) {
        if (getExclusiveOwnerThread() != current) {
            return -1;  // 有写锁且不是自己,获取失败
        }
        // 如果是自己持有写锁,可以继续获取读锁
    }
    
    // 读锁计数
    int r = sharedCount(c);
    if (!readerShouldBlock()) {
        // 检查是否达到最大读锁数
        if (r < MAX_COUNT) {
            // CAS增加读锁计数
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 第一个获取读锁的线程,记录firstReader
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 重入
                    firstReaderHoldCount++;
                } else {
                    // 其他线程,更新HoldCounter
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        cachedHoldCounter = rh = readHolds.get();
                    }
                    rh.count++;
                }
                return 1;
            }
        }
    }
    
    // 获取失败,自旋重试
    return fullTryAcquireShared(current);
}

获取写锁

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    
    if (c != 0) {
        // 已有锁
        if (w == 0 || current != getExclusiveOwnerThread()) {
            // 有读锁(w=0)或有其他线程持有写锁,获取失败
            return false;
        }
        // 写锁重入
        if (w + acquires > MAX_COUNT) {
            throw new Error("Maximum lock count exceeded");
        }
        setState(c + acquires);
        return true;
    }
    
    // 无锁,尝试CAS获取
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
        return false;
    }
    setExclusiveOwnerThread(current);
    return true;
}

写锁饥饿问题

问题的根源

public class WriteStarvationDemo {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    public void starvationProblem() {
        // 场景:大量读线程持续获取读锁
        // 写线程可能永远得不到执行
        
        // 读线程1: 获取读锁
        // 读线程2: 获取读锁
        // 读线程3: 获取读锁
        // ... (持续有读线程)
        // 写线程: 等待写锁 (无法获取,因为一直有读锁)
    }
}

解决方案:公平读写锁

public class FairReadWriteLock {
    // 公平读写锁
    private final ReadWriteLock fairRwLock = 
        new ReentrantReadWriteLock(true);
    
    // 非公平读写锁(默认)
    private final ReadWriteLock unfairRwLock = 
        new ReentrantReadWriteLock(false);
    
    public void fairWriteAccess() {
        // 公平模式下,写线程会等待所有排队的读线程
        // 避免写饥饿,但降低吞吐量
    }
}

解决方案:降级锁

public class LockDowngradeDemo {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    public void lockDowngrade() {
        rwLock.writeLock().lock();
        try {
            // 写操作
            int value = compute();
            cache.put("key", value);
            
            // 降级为读锁:释放写锁前获取读锁
            rwLock.readLock().lock();
        } finally {
            rwLock.writeLock().unlock();  // 释放写锁
        }
        
        try {
            // 此时只有读锁,读取刚刚写入的值
            int result = cache.get("key");
            process(result);
        } finally {
            rwLock.readLock().unlock();
        }
    }
}

缓存的最佳实践

非线程安全版本的演进

// ❌ 原始版本:非线程安全
public class UnsafeCache<K, V> {
    private final Map<K, V> cache = new HashMap<>();
    
    public V get(K key) {
        if (!cache.containsKey(key)) {
            // 可能有多个线程同时通过这个检查
            cache.put(key, loadFromDb(key));
        }
        return cache.get(key);
    }
}

synchronized版本的演进

// ⚠️ synchronized版本:可以但性能差
public class SyncedCache<K, V> {
    private final Map<K, V> cache = new HashMap<>();
    
    public synchronized V get(K key) {
        if (!cache.containsKey(key)) {
            cache.put(key, loadFromDb(key));
        }
        return cache.get(key);
    }
}

ReadWriteLock版本

// ✅ ReadWriteLock版本:读多写少时性能好
public class CachedCache<K, V> {
    private final Map<K, V> cache = new HashMap<>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private volatile boolean cacheValid = false;
    
    public V get(K key) {
        rwLock.readLock().lock();
        try {
            V value = cache.get(key);
            if (value == null) {
                // 读锁释放,准备获取写锁
                rwLock.readLock().unlock();
                
                rwLock.writeLock().lock();
                try {
                    // 双重检查
                    if (cache.get(key) == null) {
                        value = loadFromDb(key);
                        cache.put(key, value);
                    } else {
                        value = cache.get(key);
                    }
                    // 降级为读锁
                    rwLock.readLock().lock();
                } finally {
                    rwLock.writeLock().unlock();
                }
            }
            return value;
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    private V loadFromDb(K key) {
        // 模拟数据库查询
        return (V) key;
    }
}

ConcurrentHashMap替代

// ✅ 现代替代:ConcurrentHashMap
public class ModernCache<K, V> {
    private final ConcurrentHashMap<K, V> cache = 
        new ConcurrentHashMap<>();
    
    public V get(K key) {
        // computeIfAbsent原子操作
        return cache.computeIfAbsent(key, this::loadFromDb);
    }
    
    private V loadFromDb(K key) {
        return (V) key;
    }
}

读锁的HoldCounter

为什么要记录HoldCounter

// 问题:如何知道某个线程持有了多少个读锁?
public class HoldCounterProblem {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private int readLockCount = 0;
    
    public void nestedRead() {
        rwLock.readLock().lock();
        try {
            readLockCount++;  // ❌ 无法准确记录
            nestedReadHelper();  // 重入
            readLockCount--;
        } finally {
            rwLock.readLock().unlock();
        }
    }
    
    public void nestedReadHelper() {
        rwLock.readLock().lock();
        try {
            // 重入读锁
        } finally {
            rwLock.readLock().unlock();
        }
    }
}

HoldCounter的实现

// HoldCounter存储每个线程持有的读锁数量
static final class HoldCounter {
    int count;          // 持有数量
    long tid;           // 线程ID
    
    // ThreadLocal存储每个线程的HoldCounter
    static final ThreadLocal<HoldCounter> readHolds = 
        new ThreadLocal<HoldCounter>();
}

// 获取当前线程的HoldCounter
private HoldCounter getReadHoldCount() {
    if (getExclusiveOwnerThread() != Thread.currentThread()) {
        return null;  // 不持有写锁
    }
    
    HoldCounter rh = readHolds.get();
    if (rh.count == 0) {
        readHolds.set(rh = new HoldCounter());
    }
    return rh;
}

面试中的高频追问

追问1:读写锁如何保证可见性?

通过happens-before规则:

  • 写锁释放 happens-before 读锁获取
  • 读锁释放 happens-before 写锁获取

这保证了写线程对数据的修改,对后续获取读锁的线程可见。

追问2:ReadLock和WriteLock可以同时持有吗?

可以。同一线程可以先获取写锁,再获取读锁(锁降级)。但不能先获取读锁再获取写锁(会导致死锁)。

public class LockDowngrade {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    
    public void correctDowngrade() {
        rwLock.writeLock().lock();
        try {
            // 修改数据
            modify();
            
            // 降级:获取读锁
            rwLock.readLock().lock();
        } finally {
            rwLock.writeLock().unlock();
        }
        
        try {
            // 读数据
            read();
        } finally {
            rwLock.readLock().unlock();
        }
    }
}

追问3:读锁重入的实现原理?

通过HoldCounter记录每个线程持有的读锁数量:

  • firstReader:第一个获取读锁的线程
  • firstReaderHoldCount:第一个线程的重入次数
  • cachedHoldCounter:缓存的HoldCounter
  • readHolds:ThreadLocal,存储当前线程的HoldCounter

追问4:什么场景不适合读写锁?

  1. 写操作频繁:读写锁的优势消失,反而增加复杂度
  2. 读操作复杂:读操作本身耗时,读锁持有时间长,影响吞吐量
  3. 需要原子读写:读写锁不能保证读-写的原子性

【学习小结】

  1. 读写锁规则:读-读允许,读-写互斥,写-写互斥
  2. state设计:高16位读锁计数,低16位写锁计数
  3. 适用场景:读多写少的缓存、配置、统计数据
  4. 写锁饥饿:大量读线程时,写锁可能等待很久
  5. 锁降级:先写后读,降级为读锁,提高并发
  6. HoldCounter:记录每个线程持有的读锁数量,支持重入
  7. 现代替代:ConcurrentHashMap.computeIfAbsent更简洁

延伸阅读