Java API 提供了一种信号量机制 Semaphore 。 一个信号量就是一个计数器, 可用于保护对一个或多个共享资源的访问。
当一个线程要访问多个共享资源中的一个时,它首先需要获得一个信号量。如果信号量内部的计数器的值大于 0 ,那么信号量就递减计数器并允许线程访问。计数器的值大于 0 意味着存在可用的空闲资源,所以线程能够访问并使用这些资源中的一个。 如果计数器的值为 0 ,信号量会让线程休眠,直到计数器的值大于 0 。计数器的值为 0 意味着所有共享资源都被其他线程占用了,所以当前想要使用资源的线程,必须等待其中一个资源被释放 。
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
这是 Semaphore 中定义的一个抽象内部类 Sync ,用于实现信号量的同步机制。该类继承了 AbstractQueuedSynchronizer ,并重写了其中的一些方法,同时提供了一些新的方法来实现 Semaphore 的不同操作。
其中,Sync 类有一个整型变量 state ,用于表示当前 Semaphore 中的可用许可数量。每当一个线程获取了一个许可时,state 的值减 1 ,每当一个线程释放了一个许可时,state 的值加 1 。
Sync 类中定义了以下方法:
其中,许可的获取和释放操作是通过对 state 进行原子操作来实现的。
在代码实现中,nonfairTryAcquireShared() 方法使用了自旋操作,不断检查 state 的值,直到当前线程成功获取许可或者其他线程释放许可后,才退出自旋。
reducePermits() 方法是用于减少 Semaphore 许可数量的,它使用自旋方式将当前 state 值减去指定数量的许可,并且对减去后的结果进行检查,以确保结果值不会小于 0 。
在减少许可的时候,也使用了原子操作,以保证多个线程同时调用 reducePermits() 方法时不会导致并发问题。
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
这段代码用于公平模式下获取许可证。先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新 state 的值。
hasQueuedPredecessors ()判断当前线程是否在等待队列中。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
这段代码的实现思路比较复杂,我们先来了解一点 AQS 队列的结构和细节。在 AQS 队列中,每个等待线程都被包装成一个 Node 对象,这些 Node 对象通过 next 指针形成了一个 FIFO (先进先出)队列。Node 节点中包含了线程本身以及一些状态信息,如前继节点、后继节点等。等待队列的头结点( head )是当前持有许可证的线程所对应的节点,等待队列的尾节点( tail )是最后一个正在等待的线程所对应的节点。
再来接着分析 hasQueuedPredecessors()的实现逻辑,首先它会先获取等待队列中的头节点和尾节点,然后判断头节点和尾节点是否相同。如果相同,说明当前线程是第一个在等待队列中的线程,返回 false 。否则,获取头节点的下一个节点,判断下一个节点的线程是否为当前线程,如果是,则说明当前线程在等待队列中,返回 true ,否则返回 false 。
限流是一种在分布式系统中常用的流量控制方式,可以防止因流量过大而导致的系统崩溃、服务降级等问题。一般情况下,限流是通过限制并发请求、请求速率等方式来实现的,比如使用令牌桶、漏桶等算法。在网关层进行限流可以减轻后端服务的压力,保证服务的可用性和稳定性。而在某些场景下,也可以在应用程序中自己实现限流,比如秒杀场景中限制并发请求的数量,避免过多的请求导致系统崩溃。
下面我们使用 Semaphore 实现一个简单的限流功能:
public class SemaphoreTest {
public static final Semaphore SEMAPHORE = new Semaphore(100);
public static final AtomicInteger failCount = new AtomicInteger(0);
public static final AtomicInteger successCount = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(()->seckill()).start();
}
}
public static boolean seckill() {
if (!SEMAPHORE.tryAcquire()) {
System.out.println("no permits, count="+failCount.incrementAndGet());
return false;
}
try {
// 处理业务逻辑
Thread.sleep(2000);
System.out.println("seckill success, count="+successCount.incrementAndGet());
} catch (InterruptedException e) {
// todo 处理异常
e.printStackTrace();
} finally {
SEMAPHORE.release();
}
return true;
}
}
鑫茂,深圳,Java 开发工程师,2022 年 3 月参加工作。
喜读思维方法、哲学心理学以及历史等方面的书,偶尔写些文字。
希望通过文章,结识更多同道中人。