【多线程06】AQS源码分析(互斥锁)

本文主要有以下内容:

  • 什么是AQS
  • Node类介绍
  • 进队过程
  • 出队过程

注:本文分析的jdk版本为JDK11,不同版本的jdk在实现上具有差异!

什么是AQS?

AQS全称是AbstractQueuedSynchronizer,是一个抽象类。为 Java 中的同步控制器如ReentrantLockCountDownLatchSemaphores等提供了一个基础框架,其内部包含一个 int 类型的state共享状态变量和一个FIFO的队列(由内部Node类实现的一个双向链表)。内部实现了对state属性的原子操作封装getState()setState()compareAndSetState(),支持独占锁和共享锁两种锁模式;内部封装了 Node 类用于构造CLH队列也就是前面所提到的FIFO队列。除此之外还实现了一个用于处理ConditionConditionObject内部类,不过这个只能用来独占锁的情况!

01_aqs原理图.png

在使用这个类时,由于内部实现了对state状态的封装操作,因此只需要实现如下几个方法即可

  • tryAcquire()
  • tryRelease()
  • tryAcquireShared()
  • tryReleaseShared()
  • isHeldExclusively()

上面的方法tryAcquire()、tryRelease()是独占锁获取和释放资源时需要实现的,tryAcquireShared()、tryReleaseShared()是实现共享锁才需要实现的!isHeldExclusively()是 Condition 才需要实现的。

以上方法在AQS类中默认抛出UnsupportedOperationException异常,这是因为这几个方法没有声明为abstract,这样我们只需要完成其中一组相关方法就可以实现加锁或解锁操作,当然也可以两种都实现如ReentrantLock类一样!

尽管内部是采用FIFO来进行管理各个线程节点的,但是没有采取强制FIFO策略,因此在实现独占锁这一类的同步控制器时,核心形式如下

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

共享锁也相似,但是需要关注级联唤醒操作!

Node类

没有获取到资源的线程会被封装为Node节点,其定义了如下几种状态用于描述线程,由于是是一个双向链表,因此还定义了 prev 和 next属性,用于指定前一个节 (predecessor) 点后继结点 (successor) 。除此之外还定义了其他属性如Thread thread(记录当前节点所对应的线程)、Node nextWaiter(用于指向在等待condition的节点或者共享模式)。

volatile Thread thread;/** 线程已取消获取所资源. */
static final int CANCELLED =  1;
/** 当前节点的前一个节点处于SIGNAL时,前一个节点释放资源时,负责唤醒该节点 */
static final int SIGNAL    = -1;
/** 处于一个condition队列,当值由-2转为0时,会被转换到等待队列即FIFO队列中. */
static final int CONDITION = -2;
/** 唤醒线程时,向后传播 用于共享锁模式下*/
static final int PROPAGATE = -3;
​
volatile int waitStatus;
​
volatile Thread thread;
Node nextWaiter;

waitStatus 的取值代表了当前节点的状态,其取值范围为[1,0,-1,-2,-3],就是上述定义的几种状态,默认情况下节点的waitiStatus的初始值为0。

进出队列的详细过程 [独占锁]

上面描述了AQS的基本数据结构,接下来就通过ReentrantLock分析进队和出队过程。该类实现了公平锁和非公平锁,独占锁和共享锁,在这里分析的是非公平锁且独占锁的情况。

其内部的抽象类Sync继承自AQS,而FairSyncNonfairSync分别表示公平锁和非公平锁的情况,二者主要的不同在于tryAcquire()方法上

ReentrantLock进队过程

在使用ReentrantLock进行同步资源控制时,一般是如下用法

// 默认创建的是非公平锁
public static ReentrantLock reentrantLock = new ReentrantLock();
​
@Override
public void run() {
    try {
        reentrantLock.lock();
        doSomething();
    } finally {
        reentrantLock.unlock();
    }
}

ReentrantLocklock()方法定义如下:

// ReentrantLock.java
public void lock() {
    sync.acquire(1);
}


sync 是内部类 Sync 类型的一个属性,Sync 继承了 AQS,而acquire()的定义来自 AQS.java

// AbstractQueuedSynchronizer.java

public final void acquire(int arg) {

    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


首先分析tryAcquire(arg)方法,由于我们创建的是非公平锁,因此需要到内部类 NonfairSync去查看具体的是实现方法:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


​
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }

    }

    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

重点分析nonfairTryAcquire(int acquires),其步骤如下

  • 获取到当前线程
  • 通过getState()获取到当前共享变量的状态,如果是0,则表示没有被其他线程占据,因此通过CAS【compareAndSetState(0, acquires)】操作改变state的状态,将自己设置为占据临界资源的线程。即可以去执行同步控制的代码!即上面的doSomething()
  • 如果不为0,但是进入临界资源的线程是当前线程,则继续增加state的值,这里也是ReentrantLock可重入的体现,如果增加导致溢出了int的取值范围,则抛出new Error("Maximum lock count exceeded");
  • 如果都不属于上述情况的话,则返回false

02_tryAquire成功.png

上面是获取成功的过程较为简单;接着看看获取失败的情况!如果没有获取到锁资源则进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)逻辑。

首先是addWaiter()方法

// AQS.java
private Node addWaiter(Node mode) {
    Node node = new Node(mode);
​
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }

    }

}

首先创建一个节点,在for循环中,首先拿到旧的Tail节点,如果这个节点为 null,则表明CLH队列还没有初始化,反之则通过CAS操作让他进入到队列中去。

初始化队列的方法如下

private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

即创建了一个新的节点且让headtail属性都指向这个新建的节点,然后再进入 oldTail != null的条件。进入后首先通过操作让节点指向上一个 Tail 节点,然后通过compareAndSetTail(oldTail, node)cas操作将新的节点设置为新的 Tail 节点

private final boolean compareAndSetTail(Node expect, Node update) {
    return TAIL.compareAndSet(this, expect, update);
}

到目前为止,我们是让这个没有进入临界资源的线程进入了队列中!梳理一下整个流程,如下图所示

03_小结01_addWaiter.png

此时CLH队列的变化情况如下两张图所示:

没有初始化时

04_队列未初始化情况.png
队列已初始化

05_队列已初始化.png

此时这种状态是:队列已经入队了,但是我们回到这里

// AbstractQueuedSynchronizer.java

public final void acquire(int arg) {

    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


如果只是简单的入队,那么为什么还需要调用acquiredQueued(node)方法呢?小小的脑袋,大大的疑惑,那就来看看这个方法!

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }

}

在这个方法里,首先通过node.predecessor();拿到传入节点的前一个节点,也就是上一个 Tail 节点,因为addWaiter()返回的节点是新建的节点且在入队列时,已经将 Tail 指针指向了最后的一个节点!

如果前一个节点是头节点,则尝试去进入临界资源,如果成功则把这个新节点设置为头节点,

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

过程如下图

06_前置节点为head节点.png

如果不是 head 节点或者 tryAcquire失败则执行if (shouldParkAfterFailedAcquire(p, node))方法,那么这个方法又是做什么的?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    if (ws > 0) {
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

首先拿到 前一个节点 的 waitStatus 属性值,如果是 SIGNAL = -1则返回 true,根据注释的内容可以看到在前一个节点的状态为SIGNAL,即前一个节点释放资源时,负责唤醒刚加入的节点,如果前一个节点的状态为CANCELLED = 1,则节点的prev指针会向前移动直到节点的waitStatus的值小于等于0为止。如果前一个节点的状态不是 -1,1,那么将通过原子操作将状态置为 -1 即SIGNAL。而返回 true 时,我们会进入parkAndCheckInterrupt()方法,其代码如下

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}


即阻塞当前线程!,而返回 false 则会进入下一次for循环,至少再来一次 这个流程!也就是说只有在前一个节点的状态为SIGNAL时,新加入的 Tail 节点才能够阻塞!过程如下图所示:

前节点SIGNAL

07_前置节点为signal.png

前节点 为CANCELLED 时

08_前置节点为cancel时.png
至此线程入队并阻塞的过程完毕:整个过程可以看作如下!

09_小结节点入队并阻塞过程.png

出队过程[独占锁]

同样从reentrannt.unlock()开始分析

// ReentranitLock.java
public void unlock() {
    sync.release(1);
}


// AQS.java
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }

    return false;
}

首先看tryRealse(arg)

// tryRealse(arg) 被子类重写 ReentrantLock.java
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

首先拿到状态变量的值,判断当前在临界资源的线程是否为之前获得临界资源的线程,如果不是抛出异常,因为是独占锁,所以只有当state = 0 时在释放资源!

回到release()方法,当tryRelease()返回 true 时,进入 if 条件中去。满足条件为h != null && h.waitStatus != 0 时,才唤醒后继结点,

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }

    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor()中,如果waitStatus < 0成立,先把其置为0,然后获取下一个节点,如果说下一个节点的waitStatus > 0即处于CANCEL,则 从后往前寻找一个waitStats <= 0的节点,然后唤醒其线程!

这里需要注意的是,head节点在出队时,其waitStatus 被置为了0,其次当head节点的下一个节点处于cancel状态时,是从后往前寻找的!在结合入队寻找阻塞点时,是否会有如下疑问?

为什么需要把前一个节点的waitStatus置为 SIGNAL?放在自己身上不可以吗?

为什么是从后往前找?而不是从前往后找?

首先是为什么从后往前找?回到addWiter()方法,在node.setPrevRelaxed(oldTail);这里我们知道这是一个原子操作,是将新的节点prev指向 tail 节点,然后通过cas操作将Tail 指向新的节点。再执行oldTail.next = node;可以看到这不是一个原子操作,很有可能在执行node.setPrevRelaxed(oldTail);后,当前线程失去调度,然后其他线程得到执行,在这种情况下假如之前的节点发生了中断或者由于超时取消,即在acquireQueued()中进入cancelAcquire(),即

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // 往前寻找其他已经取消的节点,使其出队!
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 拿到 prev的next节点,作为后续的cas操作的参数的expect值
    Node predNext = pred.next;
    // 当前结点置于 CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 如果这个节点是尾节点,则删除自己并把 tail往前移动
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // 如果不是尾节点,则需要把后后继节点连接到处于SIZGNAL的前节点上
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            //如果pre是头节点或者状态设置失败或者其线程为null则直接唤醒后继结点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

此时 在如下这种情况下,会使得pred最终指向node,从而使得 node.next = node 导致链表断开,后续线程无法被唤醒!

10_节点cancel.png

这就是为什么从后往前找的原因!

红节点后面的节点什么时候将他的prev指向红节点之前的节点?因为我们线程的阻塞是在acquireQueued()的无限for循环里面,因此当线程被唤醒唤醒之后。会再至少循环一次,因此他会再次进入shouldParkAfterFailedAcquire()方法里面!

11_指向前置节点.png

接下来看SIGNAL的问题!

首先回答放在自己身上是可以的,不仅可以我们还能够取消SIGNAL这个状态,在新版的JDK源码中,如JDK14Node节点类中就没有SIGNAL属性了。只不过 JDK11 中是这样写的。

为啥这样写?我猜测是因为在cancelAcquire方法中,当节点被CANNELLED后,后继节点在没有执行shouldParkAfterFailedAcquire()时,还是能够被访问到,如果头节点线程释放资源,要去唤醒其他线程,需要遍历链表,而又是从后往前遍历,在遍历节点时,需要根据各个状态判断是否唤醒,比较耗时,所以为了提高效率,把前一个节点的置为SIGNAL,这样直接唤醒后继节点就可以!另外在JDK11的版本里,新节点入队之前并不会删除已经被取消的节点,而在jdk14中,新节点入队之前会清除之前已经处于CANCELLED的线程!

// JDK14 AQS.java
final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    // 省略其他代码
    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled 清除前面已经CANCELLED的节点
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        // 省略其他代码
        return cancelAcquire(node, interrupted, interruptible);
    }

private void cleanQueue() {
    for (;;) {                               // restart point
        for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
            if (q == null || (p = q.prev) == null)
                return;                      // end of list
            if (s == null ? tail != q : (s.prev != q || s.status < 0))
                break;                       // inconsistent
            if (q.status < 0) {              // cancelled
                if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
                    q.prev == p) {
                    p.casNext(q, s);         // OK if fails
                    if (p.prev == null)
                        signalNext(p);
                }
                break;
            }
            if ((n = p.next) != q) {         // help finish
                if (n != null && q.prev == p) {
                    p.casNext(n, q);
                    if (p.prev == null)
                        signalNext(p);
                }
                break;
            }
            s = q;
            q = q.prev;
        }
    }
}

其二是在unparkSuccessor()里面可以看到当头节点为0时,并不会去唤醒后继节点,因此只有是 -1 的时候线程才能够放心的park(),否则可能出现永久park()的情况。

private void unparkSuccessor(Node node) {

 
    int ws = node.waitStatus;
    // 小于 0 说明后继节点需要被唤醒
    if (ws < 0)
        // 被唤醒之后,需要把自己置为0,这样后继节点在竞争失败时,会在把他置为 -1
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    // s 状态为非空,或者其为取消状态,说明 s 是无效节点,此时需要执行 if 里的逻辑
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找!
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

参考资料

Java并发AQS详解

1.5w字,30图带你彻底掌握 AQS!

AQS深入理解 shouldParkAfterFailedAcquire源码分析 状态为0或PROPAGATE的情况分析

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYJyOkss' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片