#ReentrantReadWriteLock读写锁
#一个让候选人翻车的面试题
面试官问:"读写锁适用于什么场景?"
候选人小张说:"读多写少的场景。"
面试官追问:"为什么读多写少时读写锁比普通锁性能好?"
小张说:"因为读的时候不用加锁?"
面试官继续问:"那读和写之间怎么保证可见性?"
小张答不上来了。
读写锁是Java并发中的经典设计。很多同学知道"读多写少用读写锁",但对为什么读可以并发、读写如何互斥、写锁饥饿问题理解不深。
今天这篇文章,把读写锁讲透。
#读写锁的基本概念
#读写锁的规则
public class ReadWriteLockDemo {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private int value = 0;
// 读操作:可以多个线程同时读
public int read() {
readLock.lock();
try {
return value;
} finally {
readLock.unlock();
}
}
// 写操作:独占,同一时间只能有一个线程写
public void write(int newValue) {
writeLock.lock();
try {
value = newValue;
} finally {
writeLock.unlock();
}
}
}读写锁的三条规则:
| 组合 | 是否允许 | 原因 |
|---|---|---|
| 读-读 | ✅ 允许 | 不涉及数据修改,并发安全 |
| 读-写 | ❌ 互斥 | 读可能读到脏数据 |
| 写-写 | ❌ 互斥 | 写操作需要独占 |
#使用场景
// ✅ 读多写少的缓存
public class CacheDemo<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public V get(K key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void put(K key, V value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}
// ✅ 配置文件的读写
public class ConfigService {
private final Map<String, String> config = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 多个线程可以同时读取
public String getConfig(String key) {
rwLock.readLock().lock();
try {
return config.get(key);
} finally {
rwLock.readLock().unlock();
}
}
// 只有一个线程可以写
public void updateConfig(String key, String value) {
rwLock.writeLock().lock();
try {
config.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}#ReentrantReadWriteLock的实现
#state的位划分
// ReentrantReadWriteLock的state结构
// 高16位:读锁计数(同时持有的读锁数量)
// 低16位:写锁计数(0或1,因为是互斥锁)
static final int SHARED_SHIFT = 16; // 16位
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 0x10000 = 65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 0xFFFF
// 获取读锁计数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 获取写锁计数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }#state状态示例
无锁状态: state = 0
高16位 = 0, 低16位 = 0
写锁持有: state = 1
高16位 = 0, 低16位 = 1
读锁持有(1个线程): state = 65536
高16位 = 1, 低16位 = 0
读锁持有(多个线程): state = 65536 * N (N个线程各持有1个读锁)
高16位 = N, 低16位 = 0
写锁重入(同一线程): state = 1 + 65536 (假设有读锁重入)
高16位 = 1, 低16位 = 1#获取读锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 检查是否有写锁(互斥)
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current) {
return -1; // 有写锁且不是自己,获取失败
}
// 如果是自己持有写锁,可以继续获取读锁
}
// 读锁计数
int r = sharedCount(c);
if (!readerShouldBlock()) {
// 检查是否达到最大读锁数
if (r < MAX_COUNT) {
// CAS增加读锁计数
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 第一个获取读锁的线程,记录firstReader
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入
firstReaderHoldCount++;
} else {
// 其他线程,更新HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
cachedHoldCounter = rh = readHolds.get();
}
rh.count++;
}
return 1;
}
}
}
// 获取失败,自旋重试
return fullTryAcquireShared(current);
}#获取写锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 已有锁
if (w == 0 || current != getExclusiveOwnerThread()) {
// 有读锁(w=0)或有其他线程持有写锁,获取失败
return false;
}
// 写锁重入
if (w + acquires > MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
setState(c + acquires);
return true;
}
// 无锁,尝试CAS获取
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
return false;
}
setExclusiveOwnerThread(current);
return true;
}#写锁饥饿问题
#问题的根源
public class WriteStarvationDemo {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void starvationProblem() {
// 场景:大量读线程持续获取读锁
// 写线程可能永远得不到执行
// 读线程1: 获取读锁
// 读线程2: 获取读锁
// 读线程3: 获取读锁
// ... (持续有读线程)
// 写线程: 等待写锁 (无法获取,因为一直有读锁)
}
}#解决方案:公平读写锁
public class FairReadWriteLock {
// 公平读写锁
private final ReadWriteLock fairRwLock =
new ReentrantReadWriteLock(true);
// 非公平读写锁(默认)
private final ReadWriteLock unfairRwLock =
new ReentrantReadWriteLock(false);
public void fairWriteAccess() {
// 公平模式下,写线程会等待所有排队的读线程
// 避免写饥饿,但降低吞吐量
}
}#解决方案:降级锁
public class LockDowngradeDemo {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void lockDowngrade() {
rwLock.writeLock().lock();
try {
// 写操作
int value = compute();
cache.put("key", value);
// 降级为读锁:释放写锁前获取读锁
rwLock.readLock().lock();
} finally {
rwLock.writeLock().unlock(); // 释放写锁
}
try {
// 此时只有读锁,读取刚刚写入的值
int result = cache.get("key");
process(result);
} finally {
rwLock.readLock().unlock();
}
}
}#缓存的最佳实践
#非线程安全版本的演进
// ❌ 原始版本:非线程安全
public class UnsafeCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
public V get(K key) {
if (!cache.containsKey(key)) {
// 可能有多个线程同时通过这个检查
cache.put(key, loadFromDb(key));
}
return cache.get(key);
}
}#synchronized版本的演进
// ⚠️ synchronized版本:可以但性能差
public class SyncedCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
public synchronized V get(K key) {
if (!cache.containsKey(key)) {
cache.put(key, loadFromDb(key));
}
return cache.get(key);
}
}#ReadWriteLock版本
// ✅ ReadWriteLock版本:读多写少时性能好
public class CachedCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private volatile boolean cacheValid = false;
public V get(K key) {
rwLock.readLock().lock();
try {
V value = cache.get(key);
if (value == null) {
// 读锁释放,准备获取写锁
rwLock.readLock().unlock();
rwLock.writeLock().lock();
try {
// 双重检查
if (cache.get(key) == null) {
value = loadFromDb(key);
cache.put(key, value);
} else {
value = cache.get(key);
}
// 降级为读锁
rwLock.readLock().lock();
} finally {
rwLock.writeLock().unlock();
}
}
return value;
} finally {
rwLock.readLock().unlock();
}
}
private V loadFromDb(K key) {
// 模拟数据库查询
return (V) key;
}
}#ConcurrentHashMap替代
// ✅ 现代替代:ConcurrentHashMap
public class ModernCache<K, V> {
private final ConcurrentHashMap<K, V> cache =
new ConcurrentHashMap<>();
public V get(K key) {
// computeIfAbsent原子操作
return cache.computeIfAbsent(key, this::loadFromDb);
}
private V loadFromDb(K key) {
return (V) key;
}
}#读锁的HoldCounter
#为什么要记录HoldCounter
// 问题:如何知道某个线程持有了多少个读锁?
public class HoldCounterProblem {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int readLockCount = 0;
public void nestedRead() {
rwLock.readLock().lock();
try {
readLockCount++; // ❌ 无法准确记录
nestedReadHelper(); // 重入
readLockCount--;
} finally {
rwLock.readLock().unlock();
}
}
public void nestedReadHelper() {
rwLock.readLock().lock();
try {
// 重入读锁
} finally {
rwLock.readLock().unlock();
}
}
}#HoldCounter的实现
// HoldCounter存储每个线程持有的读锁数量
static final class HoldCounter {
int count; // 持有数量
long tid; // 线程ID
// ThreadLocal存储每个线程的HoldCounter
static final ThreadLocal<HoldCounter> readHolds =
new ThreadLocal<HoldCounter>();
}
// 获取当前线程的HoldCounter
private HoldCounter getReadHoldCount() {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
return null; // 不持有写锁
}
HoldCounter rh = readHolds.get();
if (rh.count == 0) {
readHolds.set(rh = new HoldCounter());
}
return rh;
}#面试中的高频追问
#追问1:读写锁如何保证可见性?
通过happens-before规则:
- 写锁释放 happens-before 读锁获取
- 读锁释放 happens-before 写锁获取
这保证了写线程对数据的修改,对后续获取读锁的线程可见。
#追问2:ReadLock和WriteLock可以同时持有吗?
可以。同一线程可以先获取写锁,再获取读锁(锁降级)。但不能先获取读锁再获取写锁(会导致死锁)。
public class LockDowngrade {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void correctDowngrade() {
rwLock.writeLock().lock();
try {
// 修改数据
modify();
// 降级:获取读锁
rwLock.readLock().lock();
} finally {
rwLock.writeLock().unlock();
}
try {
// 读数据
read();
} finally {
rwLock.readLock().unlock();
}
}
}#追问3:读锁重入的实现原理?
通过HoldCounter记录每个线程持有的读锁数量:
- firstReader:第一个获取读锁的线程
- firstReaderHoldCount:第一个线程的重入次数
- cachedHoldCounter:缓存的HoldCounter
- readHolds:ThreadLocal,存储当前线程的HoldCounter
#追问4:什么场景不适合读写锁?
- 写操作频繁:读写锁的优势消失,反而增加复杂度
- 读操作复杂:读操作本身耗时,读锁持有时间长,影响吞吐量
- 需要原子读写:读写锁不能保证读-写的原子性
#【学习小结】
- 读写锁规则:读-读允许,读-写互斥,写-写互斥
- state设计:高16位读锁计数,低16位写锁计数
- 适用场景:读多写少的缓存、配置、统计数据
- 写锁饥饿:大量读线程时,写锁可能等待很久
- 锁降级:先写后读,降级为读锁,提高并发
- HoldCounter:记录每个线程持有的读锁数量,支持重入
- 现代替代:ConcurrentHashMap.computeIfAbsent更简洁
延伸阅读: