Semaphore信号量

一个让候选人翻车的面试题

面试官问:"Semaphore是什么?能用来做什么?"

候选人小张说:"Semaphore是信号量,用来控制并发数量的。"

面试官追问:"具体怎么用?"

小张说:"acquire()获取信号量,release()释放信号量..."

面试官继续追问:"Semaphore的tryAcquire()和不带参数的acquire()有什么区别?"

小张愣了一下。

Semaphore是JDK并发包中最灵活的工具之一。它不仅仅是"限流"那么简单,理解了它的实现,才能真正用好它。

今天这篇文章,把Semaphore讲透。

什么是Semaphore

基本概念

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 创建信号量,许可数量为3
        Semaphore semaphore = new Semaphore(3);
        
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();  // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获取许可");
                    Thread.sleep(1000);
                    semaphore.release();  // 释放许可
                } catch (InterruptedException e) {}
            }, "Thread-" + i).start();
        }
    }
}

运行结果:同一时间只有3个线程在执行,其他线程等待。

与锁的区别

// 锁:互斥,同一时间只能一个线程持有
private final ReentrantLock lock = new ReentrantLock();

// 信号量:控制并发数,同一时间可以有多个线程持有
private final Semaphore semaphore = new Semaphore(3);

// Lock: 0/1二元信号量
// Semaphore: N元信号量,N可以是任意正整数

Semaphore的实现原理

基于AQS的共享模式

public class Semaphore {
    private final Sync sync;
    
    // 非公平Sync
    static final class NonfairSync extends Sync {
        NonfairSync(int permits) {
            super(permits);
        }
        
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    
    // 公平Sync
    static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }
        
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 公平:检查队列
                if (hasQueuedPredecessors()) {
                    return -1;
                }
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || 
                    compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
    }
    
    // 核心获取逻辑
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);  // state = 许可数量
        }
        
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || 
                    compareAndSetState(available, remaining)) {
                    return remaining;
                }
            }
        }
        
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) {  // overflow
                    throw new Error("Maximum permit count exceeded");
                }
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
    }
}

acquire()的实现

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    if (tryAcquireShared(arg) < 0) {
        // 获取失败,加入等待队列
        doAcquireSharedInterruptibly(arg);
    }
}

release()的实现

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放成功,唤醒等待的线程
        doReleaseShared();
        return true;
    }
    return false;
}

公平 vs 非公平

非公平Semaphore

public class NonfairSemaphore {
    public void demo() {
        // 默认是非公平
        Semaphore semaphore = new Semaphore(3);
        
        // 非公平:可能插队
        // 新线程可能比队列中的线程先获取许可
    }
}

公平Semaphore

public class FairSemaphore {
    public void demo() {
        // 公平
        Semaphore semaphore = new Semaphore(3, true);
        
        // 公平:按FIFO顺序获取
        // 新线程必须等到队列前面的线程获取许可后才能获取
    }
}

使用场景选择

// 限流场景:一般用非公平,提高吞吐量
Semaphore unfairLimit = new Semaphore(100);

// 资源分配场景:可能需要公平,避免饥饿
Semaphore fair分配 = new Semaphore(3, true);

acquire的多种版本

不带参数的acquire

public class BasicAcquire {
    public void demo() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        // 阻塞获取许可,直到成功或被中断
        semaphore.acquire();
        
        try {
            // 业务逻辑
        } finally {
            semaphore.release();
        }
    }
}

带参数的acquire

public class MultiAcquire {
    public void demo() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        // 获取多个许可
        semaphore.acquire(2);  // 阻塞获取2个许可
        
        try {
            // 业务逻辑
        } finally {
            semaphore.release(2);  // 释放2个许可
        }
    }
}

tryAcquire()

public class TryAcquireDemo {
    public void demo() {
        Semaphore semaphore = new Semaphore(3);
        
        // 非阻塞获取,不等待
        if (semaphore.tryAcquire()) {
            try {
                // 获取成功
            } finally {
                semaphore.release();
            }
        } else {
            // 获取失败,立即返回
            System.out.println("获取失败,不等待");
        }
    }
}

tryAcquire with timeout

public class TimedAcquireDemo {
    public void demo() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        // 等待一定时间获取许可
        boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);
        
        if (acquired) {
            try {
                // 获取成功
            } finally {
                semaphore.release();
            }
        } else {
            // 超时,获取失败
            System.out.println("等待超时");
        }
    }
}

acquireUninterruptibly

public class UninterruptibleAcquire {
    public void demo() {
        Semaphore semaphore = new Semaphore(3);
        
        // 不可中断的获取
        semaphore.acquireUninterruptibly();
        
        try {
            // 业务逻辑
        } finally {
            semaphore.release();
        }
    }
}

生产中的实际应用

场景1:数据库连接池

public class ConnectionPool {
    private final Semaphore semaphore;
    private final List<Connection> connections;
    
    public ConnectionPool(int poolSize) {
        this.semaphore = new Semaphore(poolSize);
        this.connections = new ArrayList<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            connections.add(createConnection());
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return connections.remove(connections.size() - 1);
    }
    
    public void releaseConnection(Connection conn) {
        connections.add(conn);
        semaphore.release();
    }
}

场景2:限流器

public class RateLimiter {
    private final Semaphore semaphore;
    private final int maxQPS;
    private final long interval;
    private long lastReset = System.currentTimeMillis();
    
    public RateLimiter(int maxQPS) {
        this.maxQPS = maxQPS;
        this.interval = 1000;  // 1秒
        this.semaphore = new Semaphore(maxQPS);
    }
    
    public boolean tryAcquire() {
        refreshPermits();
        return semaphore.tryAcquire();
    }
    
    public void acquire() throws InterruptedException {
        refreshPermits();
        semaphore.acquire();
    }
    
    private void refreshPermits() {
        long now = System.currentTimeMillis();
        if (now - lastReset >= interval) {
            semaphore.release(maxQPS - semaphore.availablePermits());
            semaphore.acquire(maxQPS);  // 重置
            lastReset = now;
        }
    }
}

场景3:对象池

public class ObjectPool<T> {
    private final Semaphore semaphore;
    private final PooledObjectFactory<T> factory;
    private final ConcurrentLinkedQueue<T> pool;
    
    public ObjectPool(int size, PooledObjectFactory<T> factory) {
        this.semaphore = new Semaphore(size);
        this.factory = factory;
        this.pool = new ConcurrentLinkedQueue<>();
        
        for (int i = 0; i < size; i++) {
            pool.offer(factory.create());
        }
    }
    
    public T borrow() throws InterruptedException {
        semaphore.acquire();
        T obj = pool.poll();
        if (obj == null) {
            obj = factory.create();
        }
        return obj;
    }
    
    public void release(T obj) {
        if (pool.offer(obj)) {
            semaphore.release();
        }
    }
    
    public interface PooledObjectFactory<T> {
        T create();
    }
}

场景4:信号量实现锁

public class SemaphoreAsLock {
    private final Semaphore semaphore = new Semaphore(1);
    
    public void lock() throws InterruptedException {
        semaphore.acquire();
    }
    
    public void unlock() {
        semaphore.release();
    }
    
    // 实际上等价于:
    // private final Semaphore semaphore = new Semaphore(1, true); // 公平锁
    // private final Semaphore semaphore = new Semaphore(1); // 非公平锁
}

Semaphore的高级用法

leases(许可租借)

public class LeasedSemaphore {
    public void demo() {
        // Semaphore没有内置的"自动归还"机制
        // 需要手动管理release
        
        // 推荐用法:try-finally
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire();
        try {
            // 业务逻辑
        } finally {
            semaphore.release();
        }
    }
}

draining(获取并清空)

public class DrainingSemaphore {
    public void demo() {
        Semaphore semaphore = new Semaphore(5);
        
        // 获取所有可用许可
        int available = semaphore.availablePermits();
        
        // drain permits,但不阻塞
        int drained = semaphore.drainPermits();
        System.out.println("获取了 " + drained + " 个许可");
    }
}

查询状态

public class SemaphoreStatus {
    public void demo() {
        Semaphore semaphore = new Semaphore(3);
        
        // 查看可用许可数
        System.out.println("可用许可: " + semaphore.availablePermits());
        
        // 查看是否有线程在等待
        // 需要通过hasQueuedThreads(),但Semaphore没有直接暴露
        // 需要继承或包装
        
        // 查看队列长度
        // Semaphore本身不暴露队列信息
    }
}

常见错误

忘记release

public class ForgetRelease {
    public void mistake() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire();
        
        if (someCondition()) {
            return;  // ❌ 忘记release,导致许可泄漏
        }
        
        semaphore.release();
    }
    
    // ✅ 正确做法
    public void correct() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire();
        try {
            if (someCondition()) {
                return;
            }
        } finally {
            semaphore.release();  // 在finally中释放
        }
    }
}

acquire和release不匹配

public class MismatchAcquireRelease {
    public void mistake() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire(2);  // 获取2个许可
        semaphore.release();    // ❌ 只释放1个,应该release(2)
        
        semaphore.release(2);   // ✅
    }
}

在异常中忘记release

public class ExceptionInAcquire {
    public void mistake() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire();
        
        doSomething();  // 如果这里抛出异常
        
        semaphore.release();  // 不会执行
    }
    
    // ✅ 正确做法
    public void correct() throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);
        
        semaphore.acquire();
        try {
            doSomething();
        } finally {
            semaphore.release();
        }
    }
}

面试中的高频追问

追问1:Semaphore的permits可以动态调整吗?

public class DynamicPermits {
    public void demo() {
        Semaphore semaphore = new Semaphore(3);
        
        // 释放后,permits就增加了
        semaphore.release();  // permits变成4
        
        // 没有直接"增加permits"的方法
        // 只能通过release增加
    }
}

追问2:Semaphore可以用于生产者-消费者吗?

可以,但更推荐用BlockingQueue:

// ✅ Semaphore实现
public class SemaphoreProducerConsumer {
    private final Semaphore producerSem = new Semaphore(10);
    private final Semaphore consumerSem = new Semaphore(0);
    private final Object[] items = new Object[10];
    
    public void put(Object item) throws InterruptedException {
        producerSem.acquire();
        // 放入item
        consumerSem.release();
    }
    
    public Object take() throws InterruptedException {
        consumerSem.acquire();
        // 取出item
        producerSem.release();
        return null;
    }
}

// ✅ BlockingQueue更简洁
public class BQProducerConsumer {
    private final BlockingQueue<Object> queue = 
        new ArrayBlockingQueue<>(10);
    
    public void put(Object item) throws InterruptedException {
        queue.put(item);
    }
    
    public Object take() throws InterruptedException {
        return queue.take();
    }
}

追问3:Semaphore的公平性会影响性能吗?

非公平Semaphore吞吐量更高,但可能导致线程饥饿。 公平Semaphore保证FIFO,但需要额外的队列操作。

【学习小结】

  1. Semaphore:控制同时访问资源的线程数量
  2. acquire():获取许可,阻塞直到成功
  3. release():释放许可,唤醒等待线程
  4. tryAcquire():非阻塞获取,立即返回
  5. tryAcquire(timeout):等待一定时间获取
  6. 公平 vs 非公平:公平保证FIFO,非公平吞吐量更高
  7. 使用场景:连接池、限流器、对象池
  8. 注意事项:acquire和release必须配对,在finally中释放

延伸阅读