Java 并发编程之 AQS


AQS,全称为 AbstractQueuedSynchronizer,是 JUC 包中的一个抽象类,为开发者提供了编写同步锁的机制
其中封装了 FIFO(first in first out) 的队列

AQS 的「范式」

继承 AQS 的实现 Sync 类都需要遵守一定的「范式」
一般来说,一个锁的实现,要么是独占式的,要么是共享式的,AQS 中需要开发者 override 以下的方法,但对于某一种锁实现(独占式/共享式)来说,只需要 override 其中的一对即可

方法 作用
boolean tryAcquire(int arg) 独占式尝试获取锁状态,返回值表示是否获取到锁
boolean tryRelease(int arg) 独占式尝试释放锁状态,返回值表示锁状态是否完全被释放而非是否释放成功,因为在可重入锁的情况下,释放后当前线程可能还持有者该所状态
int tryAcquireShare(int arg) 共享式尝试获取锁状态
boolean tryReleaseShare(int arg) 共享式尝试释放锁状态
boolean isHeldExclusively() 是否独占式的,一般返回 true 表示当前的锁状态被当前线程持有

关于独占式和共享式

独占式和共享式的区别在于,在同一个时刻是否能有多个线程获取所状态
顾名思义,独占式就是锁状态只能被一个线程获取到,而共享式可以被多个线程获取锁(例如读锁,可以多个线程同时读)

对于一个锁框架来说,需要做到

  1. 对锁状态的维护
  2. 对锁竞争时候,无法竞争到锁的线程的处理

而在 AQS 中,以上的两个问题是这样解决的

  1. AQS 提供一个 volatile int state 变量,用来标识锁的状态,并且提供了以下三个方法进行读和取
方法 作用
final int getState() 获取 state 的值
final void setState(int newState) 设置 state 的值
final boolean compareAndSetState(int expect, int update) 原子地修改 state 的值,返回值表示是否修改成功
  1. 而对于第二个问题,AQS 提供了一个 FIFO 的队列对无法获取到锁的线程进行入队等待的处理

在 AQS 中维护了一个 headtail 两个字段,其类型为 AQS 的内部类 NodeNode 的数据结构如下表格,可见通过 headtail 则构成了一个双向的链表

其结构大概表现为:

aqs_queue.png

字段 作用
int waitStatus 表示当前 Node 的等待状态,具体见下面的表格
Node prev 表示当前 Node 的前置节点
Node next 表示当前 Node 的后置节点
Thread thread 表示将这个 node 加入队列中的线程,在构造函数中赋值,并在Node使用完毕后会置为 null
Node nextWaiter 指向下一个在 condition 上等待的 Node,或者共享式的 Node

waitStatus:

枚举 含义
CANCELLED 1 表示线程获取锁的请求已经被取消
SIGNAL -1 表示线程在等待锁资源,也表示其后续的节点的线程在等待唤醒
CONDITION -2 表示线程正在等待 condition
PROPAGATE -3 表示下一个 acquireShared 应该无条件传播

独占式机制

在独占锁机制中,waitStatus 只会使用到 CANCELLED 和 SIGNAL 两个状态

lock.tryLock() 会调用 sync.tryAcquire(int arg) 方法,tryAcquire() 方法会通过 cas 的方式设置 state 的值,如果设置成功则返回 true,否则说明无法获取同步状态,则返回 false

lock.lock() 方法,调用 sync.acquire(int arg) 方法

tryAcquire() 方法:尝试获取锁(修改标志位),无论成功与否立即返回
acquire() 方法:获取锁(修改标志位),获取成功则返回,失败则进入队列等待,直到获取到锁

acquire

1
2
3
4
5
6

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

可见 acquire() 方法也会调用 tryAcquire() 方法先尝试获取同步状态,如果能获取到(即 tryAcquire() 返回 true),则不会走后续的 acquireQueued() 以及 selfInterrupt() 方法

如果无法获取同步状态(即 tryAcquire() 返回 false),调用 addWaiter() 方法以及 acquireQueued() 方法

addWaiter

接着看 addWaiter() 方法,顾名思义,添加一个等待节点,将新建的 Node 加入链表的队尾,并返回该节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Node addWaiter(Node mode) {
//先构造一个 Node 对象,包含当前线程对象,以及 mode (即传入的 Node.EXCLUSIVE 独占式)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//将 pred 指向 tail 节点
Node pred = tail;
if (pred != null) {
//如果 pred 不为空,即队尾有节点
//设置 node 的前置节点为 pred(即 tail)
node.prev = pred;
//通过 cas 将 node 设置为链表队尾
if (compareAndSetTail(pred, node)) {
//设置成功后,将 tail 的后置节点设置为 node,并返回
pred.next = node;
return node;
}
}
//走到这里说明 tail 为 null,即链表为空
//或者通过 cas 设置链表队尾失败,说明有多个线程在竞争设置队尾
enq(node);
return node;
}

接着看 enq() 方法

将 node 插入到链表的队尾,并返回 node 的前序节点

在 AQS 中,FIFO 队列中的 head 是个哨兵节点,当一个入队的线程获取到锁之后,会将自己对应的节点设置为头节点 head,并将 head 中的 prev 和 thead 都置空,独占式见 acquireQueued -> setHead,共享式见 doAcquireShared->setHeadAndPropagate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// t == null 说明还未初始化,则先创建一个哨兵节点并设置为 head,接着将 tail 指向哨兵节点
//即 head -> new Node() 0= <- tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将 node 插入到 tail 后,并组成双向链表后返回 node 的前序节点(即旧的 tail)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

再回到 acquire() 方法中 if 中的 acquireQueued() 方法

acquireQueued

acquireQueued 方法会建立一个死循环,不断地从队列中获取独占式的线程进行处理,方法的返回值代表是否需要中断该线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//死循环
//获取 node 的前序节点 p
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//如果 p 是头节点且成功获取锁状态
//将 node 设置为头节点并将原来的头节点 p 从链表中删除
setHead(node);
p.next = null; // help GC
failed = false;
//返回 interrupted
//这个死循环只有在这才会返回退出
//即只有当 node 的前续节点是头节点时
//并且当前线程尝试获取锁成功了才会退出循环
return interrupted;
}
//走到这里说明 p 不是头节点,或者是头节点但是尝试获取同步状态失败(tryAcquire 返回 false)
//这里会将获取不到锁的线程进行挂起,避免循环自旋造成 CPU 性能的无谓消耗
//shouldParkAfterFailedAcquire 返回的是该线程是否需要挂起
//parkAndCheckInterrupt 方法会将线程挂起,并且返回是否在挂起期间线程被中断了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire

node: 当前线程对应的节点
pred: 当前线程对应的节点 node 的前序节点

当一个线程尝试获取锁失败后,会调用这个方法判断是否需要挂起,方法返回值表示是否需要挂起该线程

这里讲一下 Node 中 waitStatus 这个字段的 SIGNAL 这个状态
SIGNAL 字段表示下一个节点处于挂起或者快要进行挂起的操作了,但这个状态不是节点给自己设置的,而是由后序节点修改的

在 AQS 中,当一个节点入队,说明自己需要等待锁,则会修改前序节点的 waitStatus 为 SIGNAL,代表「喂,我在你后面排队,等你处理好事情了叫醒我,我先睡会(挂起)」

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取 pred 节点的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前序节点的 waitStatus 已经为 Signal,返回 true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//如果前序节点的 waitStauts > 0 (即 CANCELLED)
//则往前遍历找到非 CANCELLED 状态的节点,并删除 CANCELLED 状态的节点
//最后 return false
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//走到这里说明前序节点的 waitStatus 为 0 或者 PROPAGATE(-3)
//则将前置节点的 waitStatus 设置为 SIGNAL
//最后 return false
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt

这个方法中通过 LockSupport.park() 方法将当前线程挂起,并阻塞在该行代码处,直到被唤醒响应
通过这个方法挂起的线程在被中断后,不会抛出 InterruptException 的异常

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

总结一下:

调用 acquire() 方法的线程首先会先尝试获取锁状态,如果获取成功,则执行后续代码,不表
主要是在多个线程竞争同一个锁时,存在竞争状态的情况下

调用 tryAcquire() 返回 false,即该线程获取锁状态不成功,则会在双向链表最后面插入一个 node 节点(持有该线程对象)
并通过 acquireQueued() 方法开启一个死循环

  1. 如果 node 节点的前序节点为 head 节点
    说明这个 node 节点已经排队排到最前面了,可以尝试获取锁状态,获取成功后则将该 node 节点置为 head,并跳出循环(跳出循环说明该线程已经获取到锁,可以执行其需要执行的代码)
    如果获取锁状态失败,则进入下一步判断该线程是否需要挂起

实际上在这个双向链表中,head 节点只是充当一个哨兵的作用,并没有其他作用

  1. 如果 node 节点的前序节点不为 head 节点,则判断该线程是否能够挂起
    如果当前节点的前置节点的 waitStatus 为 Signal,说明前面的节点也在等待中,那自己理所应当的就该挂起阻塞等待了
    如果当前节点的前置节点的 waitStatus 为 Canceled,则往前遍历并修改链表,跳过并删除 canceled 的节点
    否则,将当前节点的前置节点的 waitStatus 置为 Signal

  2. shouldParkAfterFailedAcquire() 方法中,除非 pred.waitStatus == Signal 则直接返回 true,会将当前线程挂起,否则会继续循环,「将 canceled 的节点删除」或者「将前序节点的 waitStatus 置为 Signal」,当完成「将前序节点的 waitStatus 置为 Signal」这一步(即compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) 后,下次循环如果 就会走到 pred.waitStatus == Signal 的 case 中返回 true,后续的 parkAndCheckInterrupt() 方法会将该线程挂起

  3. 如果一个节点是头节点的下一个节点,且头节点的线程还在占用着锁状态,则会不停地自旋去调用 tryAcquire() 方法尝试获取锁

中断机制

从前文我们知道,acquireQueued() 的返回值代表是否要中断线程,如果返回 true ,则会走到 if 的 case 中调用 selfInterrupt() 方法中断当前线程
而在 acquireQueued() 方法中,只有在 shouldParkAfterFailedAcquire()parkAndCheckInterrupt() 方法都返回 true 的情况下,才会将 interrupted 的值置为 true,并在循环结束的时候返回

回看一下 parkAndCheckInterrupt() 方法,这个方法中回调用 LockSupport.park(this) 将线程挂起,由于这个方法将线程挂起时,不同于 thread.wait() 和 Thread.sleep() ,通过 LockSupport.park() 方法将线程挂起期间,不会抛出中断异常,所以在被唤醒后,需要通过 Thread.interrupted() 方法的返回值来决定是否需要中断当前线程

Thread.interrupted() 返回的是线程是否被中断过,并清除中断状态

如果线程在等待过程中被中断过(thread.interrupt()) 则 Thread.interrupted() 会返回 true ,将 acquireQueued() 方法中的 interrupted 的值修改为 true ,直到循环退出,调用 selfInterrupt() 方法将线程中断

解锁过程

release

1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean release(int arg) {
//先尝试释放锁状态,由实现类自定义
if (tryRelease(arg)) {
//释放锁状态成功
Node h = head;
//头节点不为空且其 waitStatus 不为初始状态
if (h != null && h.waitStatus != 0)
//唤醒后序节点
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor

unparkSuccessor() 方法是为了唤醒 node 节点的后序节点对应的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//将头节点的 waitStatus 值置为 0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果头节点的后序节点为空,或者其 waitStatus 为 Canceled 状态,则将其删除
s = null;
//从链表的尾巴向前查找,找到链表中第一个 waitStatus < 0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//找到了链表中最靠前的 waitStatus < 0 的节点,将其唤醒
LockSupport.unpark(s.thread);
//唤醒后则会回到 parkAndCheckInterrupt 方法中被挂起阻塞的地方,继续执行后续的代码尝试获取锁状态等
}

共享式机制

在 AQS 中,共享式的方法都以 Shared 结尾,同样的,我们先来看 acquireShared() 方法

acquireShared

acquireShare 方法在共享模式中获取锁,会忽略掉中断,会先调用一次 tryAcquireShared 方法获取锁,成功后返回,否则,线程会进行入队等待

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

tryacquireShared

tryacquireShared 方法中直接抛出了异常,说明这是需要共享式锁的实现类自行实现的方法
返回值为 int 值,返回值是重点所在

1
2
3
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
返回值 意义
<0 表示尝试获取共享锁状态失败
0 表示获取共享锁状态成功,但不需要唤醒后序的处于等待共享锁的节点
>0 表示尝试获取共享锁状态成功,如果后序节点处于等待中,则需要将其唤醒

doAcquireShared

从上文知道,当尝试获取共享锁失败后,会调用 doAcquireShared 方法
这段代码和上文的独占式锁的机制很类似,也是将节点入队后,通过循环不断的获取队列中的线程进行处理
这里我们只看下不同的地方,即 setHeadAndPropagate 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
//新建一个 node 节点,并将其加入等待队列的队尾
//注意这里的 node 为 Node.SHARED
//所以创建的 node 对象中的 nextWaiter == Node.SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//r >= 0 说明获取共享锁成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

setHeadAndPropagate 方法用来将 node 设置为头结点,并根据 propagate 的值等条件判断是否进行传播
回顾一下独享锁,独享式锁是在 release() 方法中通过 unparkSuccessor() 唤醒后序节点起来获取锁后执行代码
而在共享式锁中,锁状态可以被多个线程所持有,所以当某个线程获取锁后,可以告知队列中的线程可以起来获取锁而不需要等到当前线程释放锁的时候再进行获取锁

当然也取决于 propagate 的值
propagate 即 tryAcquireShare() 方法的返回值

从代码中可见,当满足第一个 if 中的条件后,会获取 node.next(即头结点的后一个节点),如果其为空,即等待队列中没有等待的节点了,或者其为共享式的节点,则会调用 doReleaseShared() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void setHeadAndPropagate(Node node, int propagate) {
//先记录一下当前的 head 的引用
Node h = head; // Record old head for check below
//更新 node 为新的 head
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//propagate > 0 表示调用者告知信号量需要往后传播
//h == null || h.waitStauts < 0 说明旧的 head 为空或者为 Canceled 状态
//h = head == null || h.waitStauts < 0 说明新的的 head 为空或者为 Canceled 状态

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

共享锁的释放

在看 doReleaseShared 方法前,先来看一下 tryReleaseSharedreleaseShared 方法,有助于理解后面的 doReleaseShared 方法

tryReleaseShared

tryReleaseShared 方法需要子类自行实现
但规范了返回值代表「这次共享锁的释放是否需要唤醒后续等待的节点」,如果需要唤醒后续的节点则返回 true,否则返回 false

1
2
3
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

releaseShared

releaseShared 方法先调用 tryReleaseShared 方法尝试释放了锁,如果返回值为 true,则 调用 doReleaseShared 方法

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

doReleaseShared

顾名思义,做释放共享锁的事情
这里需要明白一个事情,就是 doReleaseShared 方法可能在同一时间有多个线程在访问
因为是共享锁,所以可能有的线程正在释放锁,有的线程刚获得锁成为头节点,需要唤醒后续节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;

//<1>
//判断 h 不为空(即头节点 head) 且 h!=tail(即链表至少有两个节点,才需要唤醒下一个节点)
if (h != null && h != tail) {
int ws = h.waitStatus;
//判断 h 的 waitStatus 是否为 SIGNAL
if (ws == Node.SIGNAL) {
//h 的 waitStaus 为 SIGNAL 则通过 cas 设置为 0,如果设置失败则进入下一个循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//如果 cas 修改 h 的 waitStatus 成功,则唤醒 h 的后序节点
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//如果 ws 为 0,说明 head 是刚刚成为了头结点,因为如果有后序节点进入
continue; // loop on failed CAS
}
//</1>
if (h == head) // loop if head changed
//只有满足 h == head 这个条件时才会退出循环
break;
}
}

这段代码有点难以理解

关于如何退出循环 if (h == head) 的判断

先来解释一下 if(h == head) 的这个判断,在 doReleaseShared 方法的死循环中,只有满足了这个条件才会退出循环,也就是说在 1—的这段代码执行期间,head 的引用没有发生改变,即没有新的节点成为头结点,则当前线程退出此死循环。
那什么情况下 head 会发生变化呢,答案自然是有其他的节点(可能是新加入来的节点)获取到了锁,会将 head 修改掉
举个例子,假设线程 A 调用了 doReleaseShared 方法, 此时 head == NodeA,在执行到 <1></1> 中的代码时,因为是共享锁,这时候另一个线程 B 尝试获取锁成功,将 head 修改为 NodeB,当线程 A 执行完 中间的代码后,发现 h != NodeA 了,则会继续进行循环,将此刻的头结点 head(即NodeB) 的后序节点唤醒。直到 h == head ,说明 head 在执行期间没有发生变化,说明已经完成了唤醒头节点后序节点的任务,那么就可以退出循环去做自己的事情了

也就是说这里如果有多个线程在同时执行的时候,多个线程都会帮助唤醒 head 节点的后序节点,这个思想就和 ConcurrentHashMap 中多个线程在 put 数据时,如果发现正在扩容,则会一起帮忙扩容,而不是傻傻的等待。妙啊

小结

至此,差不多将 AQS 中的重点源码都过了一遍,其他的方法大多大同小异,稍微看一下即可理解,不再赘述,读者阅读时若发现纰漏,望来信斧正,感谢

致谢

在学习 AQS 的过程中,从 B站寒食君日拱一兵ChiuCheng 中学到了很多知识,感谢

附上链接:

【Java并发】并发编程的意义是什么?月薪30K必知必会的Java AQS机制

Java AQS队列同步器以及ReentrantLock的应用

逐行分析AQS源码(3)——共享锁的获取与释放

作者

PPTing

发布于

2022-02-19

更新于

2022-02-24

许可协议

评论