本文主要有以下内容:
- 什么是AQS
- Node类介绍
- 进队过程
- 出队过程
注:本文分析的jdk版本为JDK11,不同版本的jdk在实现上具有差异!
什么是AQS?
AQS全称是AbstractQueuedSynchronizer
,是一个抽象类。为 Java 中的同步控制器如ReentrantLock
、CountDownLatch
、Semaphores
等提供了一个基础框架,其内部包含一个 int 类型的state共享状态变量和一个FIFO的队列(由内部Node类实现的一个双向链表)。内部实现了对state属性的原子操作封装getState()
、setState()
、compareAndSetState()
,支持独占锁和共享锁两种锁模式;内部封装了 Node 类用于构造CLH队列也就是前面所提到的FIFO队列。除此之外还实现了一个用于处理Condition
的ConditionObject
内部类,不过这个只能用来独占锁的情况!
在使用这个类时,由于内部实现了对state状态的封装操作,因此只需要实现如下几个方法即可
- tryAcquire()
- tryRelease()
- tryAcquireShared()
- tryReleaseShared()
- isHeldExclusively()
上面的方法tryAcquire()、tryRelease()
是独占锁获取和释放资源时需要实现的,tryAcquireShared()、tryReleaseShared()
是实现共享锁才需要实现的!isHeldExclusively()
是 Condition 才需要实现的。
以上方法在AQS类中默认抛出UnsupportedOperationException
异常,这是因为这几个方法没有声明为abstract
,这样我们只需要完成其中一组相关方法就可以实现加锁或解锁操作,当然也可以两种都实现如ReentrantLock
类一样!
尽管内部是采用FIFO来进行管理各个线程节点的,但是没有采取强制FIFO策略,因此在实现独占锁这一类的同步控制器时,核心形式如下
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
共享锁也相似,但是需要关注级联唤醒操作!
Node类
没有获取到资源的线程会被封装为Node节点,其定义了如下几种状态用于描述线程,由于是是一个双向链表,因此还定义了 prev 和 next属性,用于指定前一个节 (predecessor) 点后继结点 (successor) 。除此之外还定义了其他属性如Thread thread
(记录当前节点所对应的线程)、Node nextWaiter
(用于指向在等待condition的节点或者共享模式)。
volatile Thread thread;/** 线程已取消获取所资源. */
static final int CANCELLED = 1;
/** 当前节点的前一个节点处于SIGNAL时,前一个节点释放资源时,负责唤醒该节点 */
static final int SIGNAL = -1;
/** 处于一个condition队列,当值由-2转为0时,会被转换到等待队列即FIFO队列中. */
static final int CONDITION = -2;
/** 唤醒线程时,向后传播 用于共享锁模式下*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Thread thread;
Node nextWaiter;
waitStatus 的取值代表了当前节点的状态,其取值范围为[1,0,-1,-2,-3],就是上述定义的几种状态,默认情况下节点的waitiStatus的初始值为0。
进出队列的详细过程 [独占锁]
上面描述了AQS的基本数据结构,接下来就通过ReentrantLock分析进队和出队过程。该类实现了公平锁和非公平锁,独占锁和共享锁,在这里分析的是非公平锁且独占锁的情况。
其内部的抽象类Sync继承自AQS,而FairSync
和NonfairSync
分别表示公平锁和非公平锁的情况,二者主要的不同在于tryAcquire()
方法上
ReentrantLock进队过程
在使用ReentrantLock进行同步资源控制时,一般是如下用法
// 默认创建的是非公平锁
public static ReentrantLock reentrantLock = new ReentrantLock();
@Override
public void run() {
try {
reentrantLock.lock();
doSomething();
} finally {
reentrantLock.unlock();
}
}
在 ReentrantLock
中 lock()
方法定义如下:
// ReentrantLock.java
public void lock() {
sync.acquire(1);
}
sync
是内部类 Sync
类型的一个属性,Sync
继承了 AQS
,而acquire()
的定义来自 AQS.java
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先分析tryAcquire(arg)
方法,由于我们创建的是非公平锁,因此需要到内部类 NonfairSync
去查看具体的是实现方法:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
重点分析nonfairTryAcquire(int acquires)
,其步骤如下
- 获取到当前线程
- 通过getState()获取到当前共享变量的状态,如果是0,则表示没有被其他线程占据,因此通过CAS【compareAndSetState(0, acquires)】操作改变state的状态,将自己设置为占据临界资源的线程。即可以去执行同步控制的代码!即上面的doSomething()
- 如果不为0,但是进入临界资源的线程是当前线程,则继续增加state的值,这里也是ReentrantLock可重入的体现,如果增加导致溢出了int的取值范围,则抛出
new Error("Maximum lock count exceeded");
- 如果都不属于上述情况的话,则返回false
上面是获取成功的过程较为简单;接着看看获取失败的情况!如果没有获取到锁资源则进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
逻辑。
首先是addWaiter()
方法
// AQS.java
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
首先创建一个节点,在for
循环中,首先拿到旧的Tail节
点,如果这个节点为 null
,则表明CLH队列还没有初始化,反之则通过CAS操作让他进入到队列中去。
初始化队列的方法如下
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
即创建了一个新的节点且让head
和tail
属性都指向这个新建的节点,然后再进入 oldTail != null
的条件。进入后首先通过操作让节点指向上一个 Tail
节点,然后通过compareAndSetTail(oldTail, node)
cas操作将新的节点设置为新的 Tail
节点
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
到目前为止,我们是让这个没有进入临界资源的线程进入了队列中!梳理一下整个流程,如下图所示
此时CLH队列的变化情况如下两张图所示:
没有初始化时
队列已初始化
此时这种状态是:队列已经入队了,但是我们回到这里
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果只是简单的入队,那么为什么还需要调用acquiredQueued(node)
方法呢?小小的脑袋,大大的疑惑,那就来看看这个方法!
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
在这个方法里,首先通过node.predecessor();
拿到传入节点的前一个节点,也就是上一个 Tail 节点,因为addWaiter()返回的节点是新建的节点且在入队列时,已经将 Tail 指针指向了最后的一个节点!
如果前一个节点是头节点,则尝试去进入临界资源,如果成功则把这个新节点设置为头节点,
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
过程如下图
如果不是 head 节点或者 tryAcquire
失败则执行if (shouldParkAfterFailedAcquire(p, node))
方法,那么这个方法又是做什么的?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
首先拿到 前一个节点 的 waitStatus 属性值,如果是 SIGNAL = -1
则返回 true,根据注释的内容可以看到在前一个节点的状态为SIGNAL
,即前一个节点释放资源时,负责唤醒刚加入的节点,如果前一个节点的状态为CANCELLED = 1
,则节点的prev指针会向前移动直到节点的waitStatus的值小于等于0为止。如果前一个节点的状态不是 -1,1
,那么将通过原子操作将状态置为 -1 即SIGNAL
。而返回 true 时,我们会进入parkAndCheckInterrupt()
方法,其代码如下
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
即阻塞当前线程!,而返回 false 则会进入下一次for循环,至少再来一次 这个流程!也就是说只有在前一个节点的状态为SIGNAL
时,新加入的 Tail 节点才能够阻塞!过程如下图所示:
当 前节点 为SIGNAL
时
当 前节点 为CANCELLED 时
至此线程入队并阻塞的过程完毕:整个过程可以看作如下!
出队过程[独占锁]
同样从reentrannt.unlock()
开始分析
// ReentranitLock.java
public void unlock() {
sync.release(1);
}
// AQS.java
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先看tryRealse(arg)
// tryRealse(arg) 被子类重写 ReentrantLock.java
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先拿到状态变量的值,判断当前在临界资源的线程是否为之前获得临界资源的线程,如果不是抛出异常,因为是独占锁,所以只有当state = 0 时在释放资源!
回到release()
方法,当tryRelease()
返回 true 时,进入 if 条件中去。满足条件为h != null && h.waitStatus != 0
时,才唤醒后继结点,
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找节点
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在unparkSuccessor()
中,如果waitStatus < 0
成立,先把其置为0
,然后获取下一个节点,如果说下一个节点的waitStatus > 0
即处于CANCEL
,则 从后往前寻找一个waitStats <= 0的节点,然后唤醒其线程!
这里需要注意的是,head节点在出队时,其waitStatus 被置为了0
,其次当head节点的下一个节点处于cancel状态时,是从后往前寻找的!在结合入队寻找阻塞点时,是否会有如下疑问?
为什么需要把前一个节点的waitStatus
置为 SIGNAL
?放在自己身上不可以吗?
为什么是从后往前找?而不是从前往后找?
首先是为什么从后往前找?回到addWiter()
方法,在node.setPrevRelaxed(oldTail);
这里我们知道这是一个原子操作,是将新的节点prev指向 tail 节点,然后通过cas操作将Tail 指向新的节点。再执行oldTail.next = node;
可以看到这不是一个原子操作,很有可能在执行node.setPrevRelaxed(oldTail);
后,当前线程失去调度,然后其他线程得到执行,在这种情况下假如之前的节点发生了中断或者由于超时取消,即在acquireQueued()
中进入cancelAcquire()
,即
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 往前寻找其他已经取消的节点,使其出队!
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 拿到 prev的next节点,作为后续的cas操作的参数的expect值
Node predNext = pred.next;
// 当前结点置于 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果这个节点是尾节点,则删除自己并把 tail往前移动
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// 如果不是尾节点,则需要把后后继节点连接到处于SIZGNAL的前节点上
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
//如果pre是头节点或者状态设置失败或者其线程为null则直接唤醒后继结点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
此时 在如下这种情况下,会使得pred最终指向node,从而使得 node.next = node 导致链表断开,后续线程无法被唤醒!
这就是为什么从后往前找的原因!
红节点后面的节点什么时候将他的prev指向红节点之前的节点?因为我们线程的阻塞是在acquireQueued()
的无限for循环里面,因此当线程被唤醒唤醒之后。会再至少循环一次,因此他会再次进入shouldParkAfterFailedAcquire()
方法里面!
接下来看SIGNAL
的问题!
首先回答放在自己身上是可以的,不仅可以我们还能够取消SIGNAL这个状态,在新版的JDK源码中,如JDK14
,Node
节点类中就没有SIGNAL
属性了。只不过 JDK11
中是这样写的。
为啥这样写?我猜测是因为在cancelAcquire
方法中,当节点被CANNELLED
后,后继节点在没有执行shouldParkAfterFailedAcquire()
时,还是能够被访问到,如果头节点线程释放资源,要去唤醒其他线程,需要遍历链表,而又是从后往前遍历,在遍历节点时,需要根据各个状态判断是否唤醒,比较耗时,所以为了提高效率,把前一个节点的置为SIGNAL
,这样直接唤醒后继节点就可以!另外在JDK11的版本里,新节点入队之前并不会删除已经被取消的节点,而在jdk14
中,新节点入队之前会清除之前已经处于CANCELLED
的线程!
// JDK14 AQS.java
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
// 省略其他代码
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled 清除前面已经CANCELLED的节点
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
// 省略其他代码
return cancelAcquire(node, interrupted, interruptible);
}
private void cleanQueue() {
for (;;) { // restart point
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
break; // inconsistent
if (q.status < 0) { // cancelled
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
q.prev == p) {
p.casNext(q, s); // OK if fails
if (p.prev == null)
signalNext(p);
}
break;
}
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
p.casNext(n, q);
if (p.prev == null)
signalNext(p);
}
break;
}
s = q;
q = q.prev;
}
}
}
其二是在unparkSuccessor()
里面可以看到当头节点为0时,并不会去唤醒后继节点,因此只有是 -1 的时候线程才能够放心的park()
,否则可能出现永久park()
的情况。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 小于 0 说明后继节点需要被唤醒
if (ws < 0)
// 被唤醒之后,需要把自己置为0,这样后继节点在竞争失败时,会在把他置为 -1
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// s 状态为非空,或者其为取消状态,说明 s 是无效节点,此时需要执行 if 里的逻辑
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找!
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
参考资料
AQS深入理解 shouldParkAfterFailedAcquire源码分析 状态为0或PROPAGATE的情况分析