#Semaphore信号量
#一个让候选人翻车的面试题
面试官问:"Semaphore是什么?能用来做什么?"
候选人小张说:"Semaphore是信号量,用来控制并发数量的。"
面试官追问:"具体怎么用?"
小张说:"acquire()获取信号量,release()释放信号量..."
面试官继续追问:"Semaphore的tryAcquire()和不带参数的acquire()有什么区别?"
小张愣了一下。
Semaphore是JDK并发包中最灵活的工具之一。它不仅仅是"限流"那么简单,理解了它的实现,才能真正用好它。
今天这篇文章,把Semaphore讲透。
#什么是Semaphore
#基本概念
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建信号量,许可数量为3
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取许可");
Thread.sleep(1000);
semaphore.release(); // 释放许可
} catch (InterruptedException e) {}
}, "Thread-" + i).start();
}
}
}运行结果:同一时间只有3个线程在执行,其他线程等待。
#与锁的区别
// 锁:互斥,同一时间只能一个线程持有
private final ReentrantLock lock = new ReentrantLock();
// 信号量:控制并发数,同一时间可以有多个线程持有
private final Semaphore semaphore = new Semaphore(3);
// Lock: 0/1二元信号量
// Semaphore: N元信号量,N可以是任意正整数#Semaphore的实现原理
#基于AQS的共享模式
public class Semaphore {
private final Sync sync;
// 非公平Sync
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平Sync
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 公平:检查队列
if (hasQueuedPredecessors()) {
return -1;
}
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
// 核心获取逻辑
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // state = 许可数量
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) { // overflow
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
}#acquire()的实现
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (tryAcquireShared(arg) < 0) {
// 获取失败,加入等待队列
doAcquireSharedInterruptibly(arg);
}
}#release()的实现
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放成功,唤醒等待的线程
doReleaseShared();
return true;
}
return false;
}#公平 vs 非公平
#非公平Semaphore
public class NonfairSemaphore {
public void demo() {
// 默认是非公平
Semaphore semaphore = new Semaphore(3);
// 非公平:可能插队
// 新线程可能比队列中的线程先获取许可
}
}#公平Semaphore
public class FairSemaphore {
public void demo() {
// 公平
Semaphore semaphore = new Semaphore(3, true);
// 公平:按FIFO顺序获取
// 新线程必须等到队列前面的线程获取许可后才能获取
}
}#使用场景选择
// 限流场景:一般用非公平,提高吞吐量
Semaphore unfairLimit = new Semaphore(100);
// 资源分配场景:可能需要公平,避免饥饿
Semaphore fair分配 = new Semaphore(3, true);#acquire的多种版本
#不带参数的acquire
public class BasicAcquire {
public void demo() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
// 阻塞获取许可,直到成功或被中断
semaphore.acquire();
try {
// 业务逻辑
} finally {
semaphore.release();
}
}
}#带参数的acquire
public class MultiAcquire {
public void demo() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
// 获取多个许可
semaphore.acquire(2); // 阻塞获取2个许可
try {
// 业务逻辑
} finally {
semaphore.release(2); // 释放2个许可
}
}
}#tryAcquire()
public class TryAcquireDemo {
public void demo() {
Semaphore semaphore = new Semaphore(3);
// 非阻塞获取,不等待
if (semaphore.tryAcquire()) {
try {
// 获取成功
} finally {
semaphore.release();
}
} else {
// 获取失败,立即返回
System.out.println("获取失败,不等待");
}
}
}#tryAcquire with timeout
public class TimedAcquireDemo {
public void demo() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
// 等待一定时间获取许可
boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);
if (acquired) {
try {
// 获取成功
} finally {
semaphore.release();
}
} else {
// 超时,获取失败
System.out.println("等待超时");
}
}
}#acquireUninterruptibly
public class UninterruptibleAcquire {
public void demo() {
Semaphore semaphore = new Semaphore(3);
// 不可中断的获取
semaphore.acquireUninterruptibly();
try {
// 业务逻辑
} finally {
semaphore.release();
}
}
}#生产中的实际应用
#场景1:数据库连接池
public class ConnectionPool {
private final Semaphore semaphore;
private final List<Connection> connections;
public ConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
this.connections = new ArrayList<>(poolSize);
for (int i = 0; i < poolSize; i++) {
connections.add(createConnection());
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return connections.remove(connections.size() - 1);
}
public void releaseConnection(Connection conn) {
connections.add(conn);
semaphore.release();
}
}#场景2:限流器
public class RateLimiter {
private final Semaphore semaphore;
private final int maxQPS;
private final long interval;
private long lastReset = System.currentTimeMillis();
public RateLimiter(int maxQPS) {
this.maxQPS = maxQPS;
this.interval = 1000; // 1秒
this.semaphore = new Semaphore(maxQPS);
}
public boolean tryAcquire() {
refreshPermits();
return semaphore.tryAcquire();
}
public void acquire() throws InterruptedException {
refreshPermits();
semaphore.acquire();
}
private void refreshPermits() {
long now = System.currentTimeMillis();
if (now - lastReset >= interval) {
semaphore.release(maxQPS - semaphore.availablePermits());
semaphore.acquire(maxQPS); // 重置
lastReset = now;
}
}
}#场景3:对象池
public class ObjectPool<T> {
private final Semaphore semaphore;
private final PooledObjectFactory<T> factory;
private final ConcurrentLinkedQueue<T> pool;
public ObjectPool(int size, PooledObjectFactory<T> factory) {
this.semaphore = new Semaphore(size);
this.factory = factory;
this.pool = new ConcurrentLinkedQueue<>();
for (int i = 0; i < size; i++) {
pool.offer(factory.create());
}
}
public T borrow() throws InterruptedException {
semaphore.acquire();
T obj = pool.poll();
if (obj == null) {
obj = factory.create();
}
return obj;
}
public void release(T obj) {
if (pool.offer(obj)) {
semaphore.release();
}
}
public interface PooledObjectFactory<T> {
T create();
}
}#场景4:信号量实现锁
public class SemaphoreAsLock {
private final Semaphore semaphore = new Semaphore(1);
public void lock() throws InterruptedException {
semaphore.acquire();
}
public void unlock() {
semaphore.release();
}
// 实际上等价于:
// private final Semaphore semaphore = new Semaphore(1, true); // 公平锁
// private final Semaphore semaphore = new Semaphore(1); // 非公平锁
}#Semaphore的高级用法
#leases(许可租借)
public class LeasedSemaphore {
public void demo() {
// Semaphore没有内置的"自动归还"机制
// 需要手动管理release
// 推荐用法:try-finally
Semaphore semaphore = new Semaphore(3);
semaphore.acquire();
try {
// 业务逻辑
} finally {
semaphore.release();
}
}
}#draining(获取并清空)
public class DrainingSemaphore {
public void demo() {
Semaphore semaphore = new Semaphore(5);
// 获取所有可用许可
int available = semaphore.availablePermits();
// drain permits,但不阻塞
int drained = semaphore.drainPermits();
System.out.println("获取了 " + drained + " 个许可");
}
}#查询状态
public class SemaphoreStatus {
public void demo() {
Semaphore semaphore = new Semaphore(3);
// 查看可用许可数
System.out.println("可用许可: " + semaphore.availablePermits());
// 查看是否有线程在等待
// 需要通过hasQueuedThreads(),但Semaphore没有直接暴露
// 需要继承或包装
// 查看队列长度
// Semaphore本身不暴露队列信息
}
}#常见错误
#忘记release
public class ForgetRelease {
public void mistake() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
semaphore.acquire();
if (someCondition()) {
return; // ❌ 忘记release,导致许可泄漏
}
semaphore.release();
}
// ✅ 正确做法
public void correct() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
semaphore.acquire();
try {
if (someCondition()) {
return;
}
} finally {
semaphore.release(); // 在finally中释放
}
}
}#acquire和release不匹配
public class MismatchAcquireRelease {
public void mistake() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
semaphore.acquire(2); // 获取2个许可
semaphore.release(); // ❌ 只释放1个,应该release(2)
semaphore.release(2); // ✅
}
}#在异常中忘记release
public class ExceptionInAcquire {
public void mistake() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
semaphore.acquire();
doSomething(); // 如果这里抛出异常
semaphore.release(); // 不会执行
}
// ✅ 正确做法
public void correct() throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
semaphore.acquire();
try {
doSomething();
} finally {
semaphore.release();
}
}
}#面试中的高频追问
#追问1:Semaphore的permits可以动态调整吗?
public class DynamicPermits {
public void demo() {
Semaphore semaphore = new Semaphore(3);
// 释放后,permits就增加了
semaphore.release(); // permits变成4
// 没有直接"增加permits"的方法
// 只能通过release增加
}
}#追问2:Semaphore可以用于生产者-消费者吗?
可以,但更推荐用BlockingQueue:
// ✅ Semaphore实现
public class SemaphoreProducerConsumer {
private final Semaphore producerSem = new Semaphore(10);
private final Semaphore consumerSem = new Semaphore(0);
private final Object[] items = new Object[10];
public void put(Object item) throws InterruptedException {
producerSem.acquire();
// 放入item
consumerSem.release();
}
public Object take() throws InterruptedException {
consumerSem.acquire();
// 取出item
producerSem.release();
return null;
}
}
// ✅ BlockingQueue更简洁
public class BQProducerConsumer {
private final BlockingQueue<Object> queue =
new ArrayBlockingQueue<>(10);
public void put(Object item) throws InterruptedException {
queue.put(item);
}
public Object take() throws InterruptedException {
return queue.take();
}
}#追问3:Semaphore的公平性会影响性能吗?
非公平Semaphore吞吐量更高,但可能导致线程饥饿。 公平Semaphore保证FIFO,但需要额外的队列操作。
#【学习小结】
- Semaphore:控制同时访问资源的线程数量
- acquire():获取许可,阻塞直到成功
- release():释放许可,唤醒等待线程
- tryAcquire():非阻塞获取,立即返回
- tryAcquire(timeout):等待一定时间获取
- 公平 vs 非公平:公平保证FIFO,非公平吞吐量更高
- 使用场景:连接池、限流器、对象池
- 注意事项:acquire和release必须配对,在finally中释放
延伸阅读: