线程同步机制

小王在多线程面试中被问到这样一个问题:

"你有两个线程,一个往共享队列写数据,另一个从队列读数据,怎么保证不丢数据、不重复读取?"

小王说:"加个锁就行了。"

面试官继续追问:"那读线程发现队列为空怎么办?一直在循环里判断吗?"

小王说:"用while循环等...或者...wait/notify?"

面试官点点头:"那你讲讲wait和sleep的区别,为什么wait需要放在循环里而不是if里?"

小王彻底卡住了。

多线程同步这个话题,看起来简单,但背后涉及的细节多到让人怀疑人生。今天,我们把常见的同步机制全部讲透。

一、从一个问题开始

先看一段"看起来没问题"的代码:

# 共享变量
balance = 0

def deposit(amount):
    global balance
    balance += amount  # 这里的操作不是原子的!

def withdraw(amount):
    global balance
    if balance >= amount:
        balance -= amount

问题在哪?balance += amount这个操作,在CPU层面可能会被分解成:

1. 读取balance到寄存器(LOAD)
2. 把amount加到寄存器(ADD)
3. 把结果写回内存(STORE)

如果两个线程同时执行:

线程A:LOAD balance=100 → ADD 50 → STORE balance=150
线程B:LOAD balance=100 → ADD 30 → STORE balance=130

最终结果是什么?130,而不是180。50块钱就这么"消失"了。

这就是经典的竞态条件(Race Condition)问题。

【直观类比】

同步机制 = 交通规则

想象一个双向单车道:

┌────────────────────────────────┐
│         临界区(桥)            │
│  ←──────────────────────────→  │
│         只能过一辆车            │
└────────────────────────────────┘

如果没有交通规则会发生什么?

场景:两辆车同时上桥
结果:桥塌了,数据乱了

六种同步工具 = 六种交通规则

同步工具类比适用场景
互斥锁红绿灯保护共享资源
读写锁读写专用通道读多写少场景
条件变量等待叫号生产者-消费者
信号量限流闸机计数资源池
屏障人齐发车等待多个线程完成
原子操作单车辆原子通过简单变量操作

互斥锁 = 红绿灯

┌─────────────────────────────────────┐
│  线程A ──→ 等待绿灯 ──→ 通过 ──→ 锁住 │
├─────────────────────────────────────┤
│  线程B ──→ 等待红灯 ──→ 通过 ──→ 锁住 │
├─────────────────────────────────────┤
│  线程A ──→ 释放锁 ──→ 红灯变绿 ──→ 让道│
└─────────────────────────────────────┘

条件变量 = 银行叫号

┌─────────────────────────────────────┐
│  客户(线程)到达 → 看屏幕叫号        │
│  没到自己号 → 等待(阻塞)           │
│  到自己号 → 办理业务(继续执行)      │
│  办理完成 → 叫下一个号(signal)     │
└─────────────────────────────────────┘

二、核心原理

1. 互斥锁(Mutex)

最基础的同步机制,保证同一时刻只有一个线程能访问临界区:

import threading

balance = 0
lock = threading.Lock()

def deposit(amount):
    global balance
    with lock:  # 获取锁
        balance += amount
    # 锁在这里自动释放

def withdraw(amount):
    global balance
    with lock:
        if balance >= amount:
            balance -= amount

底层实现

// pthread_mutex_lock 简化实现
void pthread_mutex_lock(pthread_mutex_t *mutex) {
    while (atomic_swap(&mutex->locked, 1) == 1) {
        // 自旋等待,或者切换到内核等待队列
        sched_yield();
    }
}

void pthread_mutex_unlock(pthread_mutex_t *mutex) {
    atomic_store(&mutex->locked, 0);
    // 唤醒等待的线程
}

2. 读写锁(Read-Write Lock)

读写锁针对"读多写少"场景优化:

操作互斥锁读写锁
独占共享(多个可同时读)
独占独占
import threading

rwlock = threading.RWLock()

def read_data():
    with rwlock.reader_lock():
        # 多个线程可以同时读
        return data

def write_data(new_data):
    with rwlock.writer_lock():
        # 独占访问
        global data
        data = new_data

适用场景

配置数据:读10000次/s,写1次/s

互斥锁:每次读都要争抢锁 → 性能差
读写锁:9999次读并行,只有1次写独占 → 性能好

3. 条件变量(Condition Variable)

条件变量用于线程间的协作等待:

import threading

queue = []
not_empty = threading.Condition()

def producer(item):
    with not_empty:
        queue.append(item)
        not_empty.notify()  # 通知一个等待者

def consumer():
    with not_empty:
        while not queue:  # 为什么是while而不是if?
            not_empty.wait()  # 等待通知
        item = queue.pop(0)
        return item

为什么wait要用while而不是if?

# 错误写法
if not queue:
    not_empty.wait()
    item = queue.pop(0)

# 问题:被唤醒后,queue可能还是空的!
# 场景:
# 线程A:queue空了,进入wait
# 线程B:producer放了一个item后notify
# 线程A:wait返回
# 但如果另一个线程C把这个item拿走了呢?
# 线程A直接pop就会报错!

4. 信号量(Semaphore)

信号量是一种计数器,用于限制并发数量:

import threading

# 连接池:最多10个并发连接
pool_semaphore = threading.Semaphore(10)

def get_connection():
    pool_semaphore.acquire()  # 获取许可
    try:
        conn = get_from_pool()
        return conn
    finally:
        pool_semaphore.release()  # 归还许可

信号量 vs 互斥锁

维度互斥锁信号量
计数0或1任意非负数
持有者必须由持有者释放谁持有都可以释放
适用场景独占资源资源池、限流

5. 屏障(Barrier)

屏障让多个线程等待彼此,直到大家都到达:

import threading

# 等待5个线程都完成准备
barrier = threading.Barrier(5)

def worker():
    prepare_data()  # 准备数据
    barrier.wait()  # 等待其他线程
    start_processing()  # 同时开始处理

threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
    t.start()

使用场景

游戏加载:所有资源加载完成 → 同时开始游戏
并行计算:所有分片计算完成 → 合并结果

6. 原子操作(Atomic)

对于简单的整数操作,原子操作比锁更高效:

# Python 3.8+
from atomic import AtomicInt

balance = AtomicInt(0)

def deposit(amount):
    balance.add(amount)  # 原子操作,无需锁

def withdraw(amount):
    while True:
        old = balance.value
        if old < amount:
            raise Exception("Insufficient funds")
        if balance.compare_exchange(old, old - amount):
            return

Java中的原子类

import java.util.concurrent.atomic.AtomicInteger;

AtomicInteger balance = new AtomicInteger(0);

public void deposit(int amount) {
    balance.addAndGet(amount);
}

public void withdraw(int amount) {
    while (true) {
        int current = balance.get();
        if (current < amount) {
            throw new RuntimeException("Insufficient funds");
        }
        if (balance.compareAndSet(current, current - amount)) {
            return;
        }
    }
}

三、边界与特例

1. 锁的种类与选择

锁类型特点适用场景
悲观锁先加锁再操作冲突频繁
乐观锁用CAS检测冲突冲突稀少
公平锁按请求顺序获取需要FIFO
非公平锁允许插队吞吐量更高
自旋锁循环等待临界区极短
阻塞锁切换线程临界区较长

2. 死锁与活锁

死锁(Deadlock)

线程A:持有锁1,等待锁2
线程B:持有锁2,等待锁1
结果:两个线程都卡死了

活锁(Livelock)

线程A:检测到冲突,让步
线程B:检测到冲突,让步
线程A:重试,又让步
线程B:重试,又让步
结果:两个线程都在动,但都在让路,永远无法前进

活锁的解决方案:随机退让

import random

def worker():
    while True:
        if try_acquire_lock():
            do_work()
            release_lock()
        else:
            # 加点随机性,避免两个线程完全同步
            sleep(random.uniform(0, 0.1))

3. ABA问题

# CAS操作检查的是"值是否等于A"
# 但值从A变成B再变回A,CAS是检测不出来的

balance = 100
# 线程A:读取balance=100,准备+50
# 线程B:balance=100 → 改为200 → 又改回100
# 线程A:CAS(100, 150) → 成功!
# 但实际上中间有其他线程操作过!

解决方案:加版本号

# 给每次修改都加版本号
class VersionedRef:
    def __init__(self, value, version):
        self.value = value
        self.version = version

4. 锁的层级问题

# 错误示范:不同顺序加锁
def withdraw(amount):
    lock1.acquire()
    lock2.acquire()
    # ...

def deposit(amount):
    lock2.acquire()  # 换个顺序!
    lock1.acquire()
    # ...

解决方案:固定加锁顺序

# 所有代码都用同一个顺序
# 先lock1,再lock2

四、常见误区

❌ 误区一:锁保护的越细越好

锁太细会增加复杂度,而且可能导致死锁。

错误示范:
balance_lock = Lock()
counter_lock = Lock()
timestamp_lock = Lock()

def update():
    with balance_lock:
        # 干点活
    with counter_lock:
        # 干点活
    with timestamp_lock:
        # 干点活

❌ 误区二:wait和sleep一样

维度waitsleep
所属Object方法Thread方法
释放锁
超时后状态不确定超时后直接返回
被interrupt抛异常抛异常

❌ 误区三:volatile能替代锁

volatile只能保证可见性,不能保证原子性

# volatile只能保证:其他线程能看到最新的值
# 但不能保证:++操作的原子性

balance = 0  # 用volatile标记
balance += 1  # 这个操作仍然是3步,不是原子的!

❌ 误区四:尽量避免锁

锁是必要的同步工具,不是因为它不好才需要避免。

真正的目标不是"不用锁",而是"正确地用锁"。

五、记忆技巧

一句话总结

互斥锁独占,信号量计数,条件变量等待,原子操作无锁

口诀

"共享资源要加锁,读多写少读写锁" "生产者等消费者,条件变量来通知" "资源池限流用信号量,齐步走用屏障" "ABA问题加版本号,锁顺序要统一"

场景速查表

场景推荐同步方式
保护单个变量原子操作
保护简单资源互斥锁
读多写少读写锁
等待条件条件变量
资源池限流信号量
多线程汇合屏障
复杂对象互斥锁 + 条件变量

六、实战检验

自检题目

题目1:为什么wait需要放在循环中?

点击查看答案

wait放在循环中是为了处理虚假唤醒(Spurious Wakeup)

虚假唤醒是指:没有其他线程调用notify,线程也会从wait中醒来。

可能的原因:

  • 系统内核的实现问题
  • 多个线程被同时唤醒,但只处理了一个数据
  • 超时唤醒

在while循环中,醒来后会再次检查条件,避免错误地继续执行。

题目2:生产者-消费者问题中,为什么要用while而不是if判断队列状态?

点击查看答案
  1. 虚假唤醒:同一时刻可能有多个消费者被唤醒,但队列只有1个元素
  2. 惊群效应:notifyAll会唤醒所有等待者,即使只添加了一个元素
  3. 超时唤醒:wait设置超时后醒来,需要重新检查条件

在while循环中,即使发生虚假唤醒,醒来后也会再次检查条件,确保安全。

题目3:为什么Redis使用单线程模型?

点击查看答案
  1. 避免锁开销:单线程天然没有锁竞争
  2. 上下文切换:单线程避免了CPU在多线程间切换的开销
  3. IO多路复用:通过epoll,单线程可以高效处理大量并发连接
  4. 简单性:无锁模型让代码更简单,出错概率更低

Redis的核心瓶颈是内存和IO,不是CPU。单线程足以处理。

面试追问预测

问题考察点进阶追问
synchronized vs Lock锁实现ReentrantLock的公平锁怎么实现
生产者-消费者条件变量如果notify和wait顺序错了会怎样
死锁条件同步基础如何避免死锁

七、生产实战案例

案例:数据库连接池的线程安全实现

import threading
import queue

class ConnectionPool:
    def __init__(self, size=10):
        self.pool = queue.Queue(size)
        self.lock = threading.Lock()
        for _ in range(size):
            self.pool.put(self._create_connection())
    
    def acquire(self):
        # 获取连接时,超时等待
        return self.pool.get(timeout=5)
    
    def release(self, conn):
        # 归还连接
        self.pool.put(conn)

案例:Java ConcurrentHashMap的分段锁

JDK 7的ConcurrentHashMap使用分段锁:

// 16个Segment,每个有自己的锁
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 每个Segment类似一个小HashMap
static class Segment<K,V> extends ReentrantLock {
    // ...有自己的数组和锁
}

这样不同key的操作可以并行执行,只有当两个操作落到同一个Segment时才需要加锁。

JDK 8之后改用CAS + synchronized,进一步提升了性能。

💡

面试时能说清楚"锁的粒度选择"和"性能权衡",说明你对并发编程的理解已经到了工程级别。