AbstractQueuedSynchronizer(抽象的队列式的同步器)简称AQS,AQS 中有一个非常重要的变量 state
(同步状态,也可以理解为资源),所有与同步相关的操作都是跟它有关的。它是用 volatile
修饰的,volatile
保证了内存可见性但并不能保证并发操作时的原子性,所以除了常规的get
和set
方法外还额外有一个 compareAndSetState()
方法,这个方法最终调用了一个 Unsafe
类的本地CAS
方法 compareAndSwapInt()
以保证操作的原子性。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如
Semaphore/CountDownLatch
)。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively()
:该线程是否正在独占资源。只有用到condition
才需要去实现它。tryAcquire(int)
:独占方式。尝试获取资源,成功则返回true
,失败则返回false
。tryRelease(int)
:独占方式。尝试释放资源,成功则返回true
,失败则返回false
。tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0
表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true
,否则返回false
。以
ReentrantLock
为例,state
初始化为0
,表示未锁定状态。A线程lock()
时,会调用tryAcquire()
独占该锁并将state+1
。此后,其他线程再tryAcquire()
时就会失败,直到A线程unlock()
到state=0
(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state
会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state
是能回到零态的。再以
CountDownLatch
以例,任务分为N个子线程去执行,state也
初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()
一次,state
会CAS
减1
。等到所有子线程都执行完后(即state=0
),会unpark()
主调用线程,然后主调用线程就会从await()
函数返回,继续后余动作。一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现
tryAcquire
-tryRelease
、tryAcquireShared
-tryReleaseShared
中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
本篇主要来看看同步器独占模式下的工作原理,独占模式下我们重点关注acquire(int)
和release(int)
这两个方法。
acquire(int)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里一共调用了四个方法:
tryAcquire()
:尝试获取同步状态,如果获取成功则直接返回;addWaiter()
:将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()
:让线程在队列中等待获取同步状态,直到获取返回。如果线程在期间被中断过,返回true
,否则返回false
;selfInterrupt()
:线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt()
,将中断补上;
tryAcquire(int)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这个方法是需要子类去实现的,这里在空实现中直接抛异常而不定义为抽象方法是为了方便子类仅实现其部分功能,某些情况下不需要调用该方法。
addWaiter(Node)
当前面一步tryAcquire()
未成功获取同步状态时,需要将当前线程加入等待列队尾部。
private Node addWaiter(Node mode) {
// 将当前线程构造成Node节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速更新尾结点,提升性能,将pred指向尾结点
Node pred = tail;
if (pred != null) {
// 尾结点不为空,将当前线程节点的前置节点指向尾结点
node.prev = pred;
// 并发执行时尾结点可能已经不是刚才的节点了,需要CAS更新尾结点指向当前线程节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾结点为空(当前线程节点是第一个入队的)或者上面快速更新失败,进入自旋更新尾结点
enq(node);
return node;
}
这里将当前线程构造成Node节点,此时如果尾结点不为空那么需要当前线程节点添加至队尾。先将当前节点的前置节点指向尾结点,而尾结点的更新可能涉及并发操作,需要本地CAS
方法 compareAndSetTail(pred, node)
以保证原子性。
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
操作成功之后将尾结点的后置节点指向当前节点,并返回当前节点。
如果尾结点为空,那么会以自旋的方式将当前线程节点加入队尾。
private Node enq(final Node node) {
// 以自旋的方式将当前线程节点加入队尾
for (;;) {
// t指向尾结点
Node t = tail;
if (t == null) { // Must initialize
// 尾结点为空,CAS设置head指向一个空节点
if (compareAndSetHead(new Node()))
// head设置成功将tail指向头(头尾一致)
tail = head;
} else {
// 尾结点不为空,将当前线程节点添加到队尾
node.prev = t;
// 跟上面快速更新是一样,CAS更新尾结点指向当前线程节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
自旋过程中先要以CAS的方式为队列设置一个空节点作为head
节点,之后将这个空的头结点指向尾结点(头尾一致)。然后再以CAS的方式将当前线程节点添加至队尾,也就是将队列的尾结点指向当前线程节点。
acquireQueued(Node, int)
同步状态获取失败,然后当前线程节点也成功加入到等待队列的尾部了,那么现在要做的事就是等待了。等到他前面的线程彻底释放释放资源后唤醒自己,自己再去拿到同步状态,然后就能做自己的事情了。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 等待过程中是否被中断过
boolean interrupted = false;
// 开始自旋等待
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是head节点,那么表示当前节点已经是老二了,此时便有资格去获取同步状态
if (p == head && tryAcquire(arg)) {
// 拿到同步状态了那就随便玩了,现将头结点指向自己,同时将自己的前置节点置为null
// 此时自己已经是老大了,之前的老大已经释放完资源出队了
setHead(node);
// 将next置为null,注释也说了方便GC回收之前的head节点
p.next = null; // help GC
failed = false;
// 返回中断标记,为true标识曾被中断过
return interrupted;
}
// 走到这里表示当前节点还未成为老二,或者自己是老二但老大还未彻底释放资源
// 那么需要检查节点状态并继续等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断结点是否中断过
parkAndCheckInterrupt())
// 返回中断标记,为true标识曾被中断过
interrupted = true;
}
} finally {
if (failed)
// 发生异常,标记该节点为取消状态
cancelAcquire(node);
}
}
这里又是一个自旋,如果当前线程节点不是老二或者是老二但未获取到同步状态,那么会判断节点的状态waitStatus
,该状态有5种情况。
CANCELLED
:值为1
,失效的节点,等待超时或被中断过,进入该状态后不会再变化;SIGNAL
:值为-1
,表示后续线程正在阻塞,当前节点完全释放资源后必须唤醒后续线程;CONDITION
:值为-2
,与Condition
相关,表示结点处于等待队列中,结点的线程等待在Condition
上,当其他线程调用了Condition
的signal()
方法后,该状态的结点将从等待队列转移到同步队列中,等待获取同步锁;PROPAGATE
:值为-3
,表示当前是一个共享节点,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态;0
:初始化状态;
第一步先检查状态,根据前置节点的状态来决定自己是否可以进入waiting
状态休息了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前置节点的状态决定后续节点的行为
if (ws == Node.SIGNAL)
// 如果前置节点为SIGNAL状态,那么自己就可以放心的去休息
return true;
if (ws > 0) {
// 如果前置节点不是有效节点(超时或者中断),继续往前查找节点,直到找到有效节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将自己加到找到的有效节点后面
pred.next = node;
} else {
// 如果前置节点不是SIGNAL状态,尝试设置其为SIGNAL状态
// 意思是我在你后边等着呢,你资源释放完了记得通知一下我,我先去休息了
// 该操作可能会失败,失败后会继续自旋
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这里根据前置节点的状态来决定自己的行为,如果前置节点已经是SIGNAL
,那么自己可以放心的进入waiting
状态去休息,然后等待前置节点来唤醒自己。此时返回true
,进入parkAndCheckInterrupt()
;
如果前置节点是CANCELLED
已取消的状态,那么会继续往前找,一直找到有效状态的节点,找到后把自己加到它后面。
如果前置节点有效单但不是SIGNAL
状态,那么会以CAS方式尝试将前置节点置为SIGNAL
状态。可能会设置失败,继续自旋。
第一次进入时,节点的状态为默认的0
,此时会进入最后一个分支中,设置完节点的状态为SIGNAL
之后返回false继续进入自旋,第二次进入时节点已经是SIGNAL
,此时返回true
,外层进入parkAndCheckInterrupt()
阻塞,这里才是真正的进入waiting
状态。
private final boolean parkAndCheckInterrupt() {
// 调用park()使线程进入waiting状态
LockSupport.park(this);
// 被唤醒后,检查自己是否被中断过,返回线程中断标记
return Thread.interrupted();
}
park()
会让当前线程进入waiting
状态。在此状态下,有两种途径可以唤醒该线程,被unpark()
或被interrupt()
。
回到最开始的acquire(int)
方法的if
判断里,在拿到同步状态后最终会返回当前线程的中断标记,如果中断标记为false
那么表示该节点成功获取资源能够做自己的事情了,再贴一遍这个方法。
public final void acquire(int arg) {
// 此处acquireQueued()方法返回的是中断标记,如果为true会进入selfInterrupt()中断当前线程
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果中断标记为true
,那么此处会进入selfInterrupt()
中断当前线程。
selfInterrupt()
再看selfInterrupt()
,这个静态方法作用就是中断当前线程。
static void selfInterrupt() {
// 中断当前线程
Thread.currentThread().interrupt();
}
release(int)
public final boolean release(int arg) {
// 同步状态完全释放才会进入下一个节点的唤醒流程
if (tryRelease(arg)) {
Node h = head;
// head节点不为空且状态不为0,唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
同样,tryRelease()
也是需要需要子类去实现的,直接调用会抛异常。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
因为是独占模式,线程没有阻塞那么肯定是拿到同步状态的,所以tryRelease()
通常都会调用成功。但是这里需要注意的是,必须要在已经彻底释放同步状态时才返回true
,否则放回false
。通常每次获取同步状态都会将state+1
(可重入),每次释放同步状态将state-1
,直到state=0
时才返回true
。然后才会进入unparkSuccessor()
真正释放自己,唤醒下一个节点。
unparkSuccessor(Node)
唤醒同步队列中的下一个有效节点
private void unparkSuccessor(Node node) {
// 获取当前节点状态
int ws = node.waitStatus;
// 如果有效,需要将节点状态设置为0,CAS操作
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 将s指向next节点
Node s = node.next;
// 如果next节点为空或失效,需要找到队列最前面的一个有效节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点开始遍历,如果遍历的节点不是当前节点,继续往前查找上一个节点
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果节点状态有效,将s指向该节点
if (t.waitStatus <= 0)
// 一路遍历最终s会指向队列最前面的一个有效的节点(不一定是head节点)
s = t;
}
// 如果s不为空,唤醒s指向的节点
if (s != null)
LockSupport.unpark(s.thread);
}
节点被唤醒后,parkAndCheckInterrupt()
从阻塞状态返回,唤醒的节点会继续在acquireQueued()
中自旋,再次贴上acquireQueued()
的代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 等待过程中是否被中断过
boolean interrupted = false;
// 开始自旋等待
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是head节点,那么表示当前节点已经是老二了,此时便有资格去获取同步状态
if (p == head && tryAcquire(arg)) {
// 拿到同步状态了那就随便玩了,现将头结点指向自己,同时将自己的前置节点置为null
// 此时自己已经是老大了,之前的老大已经释放完资源出队了
setHead(node);
// 将next置为null,注释也说了方便GC回收之前的head节点
p.next = null; // help GC
failed = false;
// 返回中断标记,为true标识曾被中断过
return interrupted;
}
// 走到这里表示当前节点还未成为老二,或者自己是老二但老大还未彻底释放资源
// 那么需要检查节点状态并继续等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 判断结点是否中断过
parkAndCheckInterrupt())
// 返回中断标记,为true标识曾被中断过
interrupted = true;
}
} finally {
if (failed)
// 发生异常,标记该节点为取消状态
cancelAcquire(node);
}
}
这个时候被唤醒的节点已经是队列中最前面的一个节点了,通常来讲会是head
节点。那么可以通过tryAcquire()
尝试获取同步状态了,获取成功便能返回退出自旋。
如果因为某些原因原来的head
节点甚至head
后面的某些结点被中断或放弃了,那么head
节点就不是有效节点了。这也没关系,下一轮的shouldParkAfterFailedAcquire()
方法会继续做出调整,直到将head
节点变成一个有效节点。这一套流程下来真是环环相扣,滴水不漏。
退出acquireQueued()
自旋之后acquire()
方法也返回了,相当于退出阻塞了,这像不像一个同步锁,没错还是一个可重入的同步锁。在ReentrantLock
中的lock()
方法,不管是公平锁还是非公平锁,最终就是调用了这个acquire(1)
。
public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() {
sync.lock();
}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
}
关于ReentrantLock
另开一篇仔细分析,这里不做深入。
AQS独占模式总结
acquire(int)
- 首先调用
tryAcquire()
获取同步状态,获取成功直接返回,这里的tryAcquire()
需要自定义同步器去实现; - 如果未获取到同步状态就
addWaiter()
将当前线程构造成Node
节点添加到同步队列尾部,并标记为独占模式,这里会有一个自旋,直到安全的将当前线程结点加入同步队列尾部; - 在
acquireQueued()
中会让当前节点在队列中休息,直到被唤醒且获取到同步状态后返回,期间如果被中断过返回true
,否则返回false
;
在这一步又有一个自旋,直到获取同步状态退出自旋返回,流程如下:
- 获取当前线程节点的前置节点,判断前置节点是否是
head
节点,如果是那说明自己已经排在队列第二的位置了,有资格获取同步状态,此时会尝试获取同步状态,获取成功退出自旋并返回中断标记; - 如果父节点不是
head
节点或者未成功获取同步状态,会检测前置节点的状态waitStatus
,如果前置节点状态为SIGNAL
,那么返回true
后自己就可以放心的去休息了,走parkAndCheckInterrupt()
进入waiting
状态;如果前置节点失效(waitStatus>0
),那么会继续向前查找前一个节点,直到找到有效节点为止;如果前置节点状态有效(waitStatus<=0
)但不为SIGNAL
,那么会尝试通过CAS
方式设置其状态为SIGNAL
,设置成功在下一轮循环就会返回true
进parkAndCheckInterrupt()
休息; - 处在
waiting
状态的节点,会在它前面的节点调用unpark()
或interrupt()
时候被唤醒,然后继续自旋直到获取到同步状态退出自旋返回;
- 线程在等待过程中被中断并没有立即响应,而是在获取到同步状态后再调用
selfInterrupt()
响应中断;
release(int)
- 唤醒原理相比就简单很多,因为已经拿到同步状态(资源)了,那么在当前节点彻底释放资源(
state=0
)的时候,它会唤醒队列最前面的一个节点来获取同步状态。 - 如果唤醒的节点并不是一个有效节点,那么会从队列尾部
tail
开始向前遍历,找到队列最靠前的一个有效节点,并将其设置为head
节点并将其唤醒。
本文结束,本篇是AQS独占模式流程学习,下一篇看看AQS共享模式。
参考:https://www.cnblogs.com/waterystone/p/4920797.html
参考:https://www.cnblogs.com/iou123lg/p/9464385.html
1 条评论
思路清晰,ReentrantLock和CountDownLatch也是用的AQS实现的,在concurrent包中中很多地方都用到自旋锁来实现的