线程同步机制
小王在多线程面试中被问到这样一个问题:
"你有两个线程,一个往共享队列写数据,另一个从队列读数据,怎么保证不丢数据、不重复读取?"
小王说:"加个锁就行了。"
面试官继续追问:"那读线程发现队列为空怎么办?一直在循环里判断吗?"
小王说:"用while循环等...或者...wait/notify?"
面试官点点头:"那你讲讲wait和sleep的区别,为什么wait需要放在循环里而不是if里?"
小王彻底卡住了。
多线程同步这个话题,看起来简单,但背后涉及的细节多到让人怀疑人生。今天,我们把常见的同步机制全部讲透。
一、从一个问题开始
先看一段"看起来没问题"的代码:
# 共享变量
balance = 0
def deposit(amount):
global balance
balance += amount # 这里的操作不是原子的!
def withdraw(amount):
global balance
if balance >= amount:
balance -= amount
问题在哪?balance += amount这个操作,在CPU层面可能会被分解成:
1. 读取balance到寄存器(LOAD)
2. 把amount加到寄存器(ADD)
3. 把结果写回内存(STORE)
如果两个线程同时执行:
线程A:LOAD balance=100 → ADD 50 → STORE balance=150
线程B:LOAD balance=100 → ADD 30 → STORE balance=130
最终结果是什么?130,而不是180。50块钱就这么"消失"了。
这就是经典的竞态条件(Race Condition)问题。
【直观类比】
同步机制 = 交通规则
想象一个双向单车道:
┌────────────────────────────────┐
│ 临界区(桥) │
│ ←──────────────────────────→ │
│ 只能过一辆车 │
└────────────────────────────────┘
如果没有交通规则会发生什么?
六种同步工具 = 六种交通规则
互斥锁 = 红绿灯
┌─────────────────────────────────────┐
│ 线程A ──→ 等待绿灯 ──→ 通过 ──→ 锁住 │
├─────────────────────────────────────┤
│ 线程B ──→ 等待红灯 ──→ 通过 ──→ 锁住 │
├─────────────────────────────────────┤
│ 线程A ──→ 释放锁 ──→ 红灯变绿 ──→ 让道│
└─────────────────────────────────────┘
条件变量 = 银行叫号
┌─────────────────────────────────────┐
│ 客户(线程)到达 → 看屏幕叫号 │
│ 没到自己号 → 等待(阻塞) │
│ 到自己号 → 办理业务(继续执行) │
│ 办理完成 → 叫下一个号(signal) │
└─────────────────────────────────────┘
二、核心原理
1. 互斥锁(Mutex)
最基础的同步机制,保证同一时刻只有一个线程能访问临界区:
import threading
balance = 0
lock = threading.Lock()
def deposit(amount):
global balance
with lock: # 获取锁
balance += amount
# 锁在这里自动释放
def withdraw(amount):
global balance
with lock:
if balance >= amount:
balance -= amount
底层实现:
// pthread_mutex_lock 简化实现
void pthread_mutex_lock(pthread_mutex_t *mutex) {
while (atomic_swap(&mutex->locked, 1) == 1) {
// 自旋等待,或者切换到内核等待队列
sched_yield();
}
}
void pthread_mutex_unlock(pthread_mutex_t *mutex) {
atomic_store(&mutex->locked, 0);
// 唤醒等待的线程
}
2. 读写锁(Read-Write Lock)
读写锁针对"读多写少"场景优化:
import threading
rwlock = threading.RWLock()
def read_data():
with rwlock.reader_lock():
# 多个线程可以同时读
return data
def write_data(new_data):
with rwlock.writer_lock():
# 独占访问
global data
data = new_data
适用场景:
配置数据:读10000次/s,写1次/s
互斥锁:每次读都要争抢锁 → 性能差
读写锁:9999次读并行,只有1次写独占 → 性能好
3. 条件变量(Condition Variable)
条件变量用于线程间的协作等待:
import threading
queue = []
not_empty = threading.Condition()
def producer(item):
with not_empty:
queue.append(item)
not_empty.notify() # 通知一个等待者
def consumer():
with not_empty:
while not queue: # 为什么是while而不是if?
not_empty.wait() # 等待通知
item = queue.pop(0)
return item
为什么wait要用while而不是if?
# 错误写法
if not queue:
not_empty.wait()
item = queue.pop(0)
# 问题:被唤醒后,queue可能还是空的!
# 场景:
# 线程A:queue空了,进入wait
# 线程B:producer放了一个item后notify
# 线程A:wait返回
# 但如果另一个线程C把这个item拿走了呢?
# 线程A直接pop就会报错!
4. 信号量(Semaphore)
信号量是一种计数器,用于限制并发数量:
import threading
# 连接池:最多10个并发连接
pool_semaphore = threading.Semaphore(10)
def get_connection():
pool_semaphore.acquire() # 获取许可
try:
conn = get_from_pool()
return conn
finally:
pool_semaphore.release() # 归还许可
信号量 vs 互斥锁:
5. 屏障(Barrier)
屏障让多个线程等待彼此,直到大家都到达:
import threading
# 等待5个线程都完成准备
barrier = threading.Barrier(5)
def worker():
prepare_data() # 准备数据
barrier.wait() # 等待其他线程
start_processing() # 同时开始处理
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
使用场景:
游戏加载:所有资源加载完成 → 同时开始游戏
并行计算:所有分片计算完成 → 合并结果
6. 原子操作(Atomic)
对于简单的整数操作,原子操作比锁更高效:
# Python 3.8+
from atomic import AtomicInt
balance = AtomicInt(0)
def deposit(amount):
balance.add(amount) # 原子操作,无需锁
def withdraw(amount):
while True:
old = balance.value
if old < amount:
raise Exception("Insufficient funds")
if balance.compare_exchange(old, old - amount):
return
Java中的原子类:
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger balance = new AtomicInteger(0);
public void deposit(int amount) {
balance.addAndGet(amount);
}
public void withdraw(int amount) {
while (true) {
int current = balance.get();
if (current < amount) {
throw new RuntimeException("Insufficient funds");
}
if (balance.compareAndSet(current, current - amount)) {
return;
}
}
}
三、边界与特例
1. 锁的种类与选择
2. 死锁与活锁
死锁(Deadlock):
线程A:持有锁1,等待锁2
线程B:持有锁2,等待锁1
结果:两个线程都卡死了
活锁(Livelock):
线程A:检测到冲突,让步
线程B:检测到冲突,让步
线程A:重试,又让步
线程B:重试,又让步
结果:两个线程都在动,但都在让路,永远无法前进
活锁的解决方案:随机退让
import random
def worker():
while True:
if try_acquire_lock():
do_work()
release_lock()
else:
# 加点随机性,避免两个线程完全同步
sleep(random.uniform(0, 0.1))
3. ABA问题
# CAS操作检查的是"值是否等于A"
# 但值从A变成B再变回A,CAS是检测不出来的
balance = 100
# 线程A:读取balance=100,准备+50
# 线程B:balance=100 → 改为200 → 又改回100
# 线程A:CAS(100, 150) → 成功!
# 但实际上中间有其他线程操作过!
解决方案:加版本号
# 给每次修改都加版本号
class VersionedRef:
def __init__(self, value, version):
self.value = value
self.version = version
4. 锁的层级问题
# 错误示范:不同顺序加锁
def withdraw(amount):
lock1.acquire()
lock2.acquire()
# ...
def deposit(amount):
lock2.acquire() # 换个顺序!
lock1.acquire()
# ...
解决方案:固定加锁顺序
# 所有代码都用同一个顺序
# 先lock1,再lock2
四、常见误区
❌ 误区一:锁保护的越细越好
锁太细会增加复杂度,而且可能导致死锁。
错误示范:
balance_lock = Lock()
counter_lock = Lock()
timestamp_lock = Lock()
def update():
with balance_lock:
# 干点活
with counter_lock:
# 干点活
with timestamp_lock:
# 干点活
❌ 误区二:wait和sleep一样
❌ 误区三:volatile能替代锁
volatile只能保证可见性,不能保证原子性:
# volatile只能保证:其他线程能看到最新的值
# 但不能保证:++操作的原子性
balance = 0 # 用volatile标记
balance += 1 # 这个操作仍然是3步,不是原子的!
❌ 误区四:尽量避免锁
锁是必要的同步工具,不是因为它不好才需要避免。
真正的目标不是"不用锁",而是"正确地用锁"。
五、记忆技巧
一句话总结
互斥锁独占,信号量计数,条件变量等待,原子操作无锁
口诀
"共享资源要加锁,读多写少读写锁"
"生产者等消费者,条件变量来通知"
"资源池限流用信号量,齐步走用屏障"
"ABA问题加版本号,锁顺序要统一"
场景速查表
六、实战检验
自检题目
题目1:为什么wait需要放在循环中?
点击查看答案
wait放在循环中是为了处理虚假唤醒(Spurious Wakeup)。
虚假唤醒是指:没有其他线程调用notify,线程也会从wait中醒来。
可能的原因:
- 系统内核的实现问题
- 多个线程被同时唤醒,但只处理了一个数据
- 超时唤醒
在while循环中,醒来后会再次检查条件,避免错误地继续执行。
题目2:生产者-消费者问题中,为什么要用while而不是if判断队列状态?
点击查看答案
- 虚假唤醒:同一时刻可能有多个消费者被唤醒,但队列只有1个元素
- 惊群效应:notifyAll会唤醒所有等待者,即使只添加了一个元素
- 超时唤醒:wait设置超时后醒来,需要重新检查条件
在while循环中,即使发生虚假唤醒,醒来后也会再次检查条件,确保安全。
题目3:为什么Redis使用单线程模型?
点击查看答案
- 避免锁开销:单线程天然没有锁竞争
- 上下文切换:单线程避免了CPU在多线程间切换的开销
- IO多路复用:通过epoll,单线程可以高效处理大量并发连接
- 简单性:无锁模型让代码更简单,出错概率更低
Redis的核心瓶颈是内存和IO,不是CPU。单线程足以处理。
面试追问预测
七、生产实战案例
案例:数据库连接池的线程安全实现
import threading
import queue
class ConnectionPool:
def __init__(self, size=10):
self.pool = queue.Queue(size)
self.lock = threading.Lock()
for _ in range(size):
self.pool.put(self._create_connection())
def acquire(self):
# 获取连接时,超时等待
return self.pool.get(timeout=5)
def release(self, conn):
# 归还连接
self.pool.put(conn)
案例:Java ConcurrentHashMap的分段锁
JDK 7的ConcurrentHashMap使用分段锁:
// 16个Segment,每个有自己的锁
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 每个Segment类似一个小HashMap
static class Segment<K,V> extends ReentrantLock {
// ...有自己的数组和锁
}
这样不同key的操作可以并行执行,只有当两个操作落到同一个Segment时才需要加锁。
JDK 8之后改用CAS + synchronized,进一步提升了性能。
💡
面试时能说清楚"锁的粒度选择"和"性能权衡",说明你对并发编程的理解已经到了工程级别。