Semaphore是在生产当中比较常用的一个工具类,字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State。
下面用一个简单的例子来说明如何使用Semaphore来控制对共享资源的访问。
假设有一个共享的打印机资源,同时有多个线程需要使用该打印机来打印任务。但是我们希望同一时间只能有一个线程能够访问打印机,以避免打印输出的混乱。
首先,我们创建一个Semaphore对象,并将计数器初始化为1,表示打印机资源最多只能同时被一个进程/线程访问。
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore printerSemaphore = new Semaphore(1);
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
PrinterTask task = new PrinterTask("任务" + (i + 1), printerSemaphore);
threads[i] = new Thread(task);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PrinterTask implements Runnable {
private String taskName;
private Semaphore printerSemaphore;
public PrinterTask(String taskName, Semaphore printerSemaphore) {
this.taskName = taskName;
this.printerSemaphore = printerSemaphore;
}
@Override
public void run() {
try {
System.out.println(taskName + " 正在等待打印机...");
// 尝试获取信号量
printerSemaphore.acquire();
System.out.println(taskName + " 正在使用打印机打印...");
// 模拟打印任务需要的时间
Thread.sleep(1000);
System.out.println(taskName + " 打印完成.");
// 释放信号量
printerSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
任务1 正在等待打印机...
任务3 正在等待打印机...
任务2 正在等待打印机...
任务3 正在使用打印机打印...
任务4 正在等待打印机...
任务5 正在等待打印机...
任务3 打印完成.
任务2 正在使用打印机打印...
任务2 打印完成.
任务4 正在使用打印机打印...
任务4 打印完成.
任务5 正在使用打印机打印...
任务5 打印完成.
任务1 正在使用打印机打印...
任务1 打印完成.
在上述代码中,首先创建了一个printerSemaphore
对象,计数器初始值为1,表示只能有一个线程能够获取打印机资源。
然后,我们定义了一个PrinterTask类实现了Runnable接口,表示一个打印线程,run方法中实现打印任务。在run方法中,线程首先尝试获取信号量(printerSemaphore.acquire()
),如果计数器大于零,表示打印机空闲,线程获取到信号量,可以执行打印任务。在打印任务执行完毕后,它释放信号量(printerSemaphore.release()
),表示打印机资源可供其他线程使用。
最后,创建了5个线程,每个线程执行一个打印任务。当线程启动时,它们会尝试获取信号量,如果有可用的打印机资源,则开始执行打印任务,否则它们将等待直到有打印机资源可用。通过Semaphore的控制,同一时间只有一个线程能够获取到打印机资源,保证了打印任务的顺序和正确性。
接下来,从代码的层面看一下Semaphore能够实现控制访问特定资源线程数目的机制。
先从构造方法看起
Semaphore printerSemaphore = new Semaphore(1);
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
继续看sync是啥?
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
// 细节代码忽略 ...
}
protected final boolean tryReleaseShared(int releases) {
// ...
}
final void reducePermits(int reductions) {
// ...
}
final int drainPermits() {
// ...
}
}
有两个实现类,FairSync和NonfairSync,这里的参数permits就是代表的限定资源的个数,
继续往下看
NonfairSync(int permits) {
super(permits);
}
Sync(int permits) {
setState(permits);
}
这是AQS中的一个方法,看看state是什么?
protected final void setState(int newState) {
state = newState;
}
private volatile int state;
它是AQS中的一个属性,state
在Semaphore
中用于表示可用的许可数量,控制线程对资源的访问权限。通过增减 state
的值,Semaphore
实现了对同时访问资源的线程数量的控制
上面只传了一个permits的参数,这种默认采用的是非公平锁,也支持传入第二个参数,选择公平与非公平锁,比如:
Semaphore printerSemaphore = new Semaphore(1,true);
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
后面的构造方法都是调的同一个super中,这两种方式的区别也是在于,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
使用时:
// 尝试获取信号量
printerSemaphore.acquire();
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
先看下非公平锁的实现:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
看下的getState方法
protected final int getState() {
return state;
}
熟悉的state属性
private volatile int state;
释放过程:
// 释放信号量
printerSemaphore.release();
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
还是先看下非公平锁的实现:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
Semaphore(信号量)是一种用于实现多线程之间同步的机制。它是由一个计数器和一组等待队列组成。
在Semaphore中,计数器表示可用资源的数量。当一个线程想要访问某个资源时,它首先必须尝试获取一个信号量。如果信号量的计数器大于零,表示有可用资源,线程可以获取信号量,计数器减一,并继续执行临界区代码,访问特定资源。如果计数器为零,表示所有资源都被占用,线程必须等待,加入到等待队列中。
当某个线程使用完特定资源后,它释放信号量,计数器加一。如果等待队列中有其他线程正在等待信号量,其中一个线程将被唤醒,获取信号量并继续执行临界区代码。
Semaphore的原理可以用以下步骤概括:
- 初始化一个Semaphore对象,并设置计数器的初始值和等待队列为空。
- 当一个线程想要访问特定资源时,它尝试获取信号量。
- 如果信号量的计数器大于零,线程获取信号量,计数器减一,并继续执行临界区代码。
- 如果计数器为零,线程必须等待,加入到等待队列中。
- 当某个线程使用完特定资源后,它释放信号量,计数器加一。
- 如果等待队列中有其他线程正在等待信号量,其中一个线程将被唤醒,获取信号量并继续执行临界区代码。
通过使用Semaphore,可以实现对特定资源的访问控制,确保同一时间只有一个线程可以访问该资源,从而避免竞争条件和数据不一致性的问题。