Java 并发编程之 AQS
AQS,全称为 AbstractQueuedSynchronizer,是 JUC 包中的一个抽象类,为开发者提供了编写同步锁的机制
其中封装了 FIFO(first in first out) 的队列
AQS 的「范式」
继承 AQS 的实现 Sync 类都需要遵守一定的「范式」
一般来说,一个锁的实现,要么是独占式的,要么是共享式的,AQS 中需要开发者 override 以下的方法,但对于某一种锁实现(独占式/共享式)来说,只需要 override 其中的一对即可
方法 | 作用 |
---|---|
boolean tryAcquire(int arg) | 独占式尝试获取锁状态,返回值表示是否获取到锁 |
boolean tryRelease(int arg) | 独占式尝试释放锁状态,返回值表示锁状态是否完全被释放而非是否释放成功,因为在可重入锁的情况下,释放后当前线程可能还持有者该所状态 |
int tryAcquireShare(int arg) | 共享式尝试获取锁状态 |
boolean tryReleaseShare(int arg) | 共享式尝试释放锁状态 |
boolean isHeldExclusively() | 是否独占式的,一般返回 true 表示当前的锁状态被当前线程持有 |
关于独占式和共享式
独占式和共享式的区别在于,在同一个时刻是否能有多个线程获取所状态
顾名思义,独占式就是锁状态只能被一个线程获取到,而共享式可以被多个线程获取锁(例如读锁,可以多个线程同时读)
对于一个锁框架来说,需要做到
- 对锁状态的维护
- 对锁竞争时候,无法竞争到锁的线程的处理
而在 AQS 中,以上的两个问题是这样解决的
- AQS 提供一个
volatile int state
变量,用来标识锁的状态,并且提供了以下三个方法进行读和取
方法 | 作用 |
---|---|
final int getState() | 获取 state 的值 |
final void setState(int newState) | 设置 state 的值 |
final boolean compareAndSetState(int expect, int update) | 原子地修改 state 的值,返回值表示是否修改成功 |
- 而对于第二个问题,AQS 提供了一个 FIFO 的队列对无法获取到锁的线程进行入队等待的处理
在 AQS 中维护了一个 head
和 tail
两个字段,其类型为 AQS 的内部类 Node
,Node
的数据结构如下表格,可见通过 head
和 tail
则构成了一个双向的链表
其结构大概表现为:
字段 | 作用 |
---|---|
int waitStatus | 表示当前 Node 的等待状态,具体见下面的表格 |
Node prev | 表示当前 Node 的前置节点 |
Node next | 表示当前 Node 的后置节点 |
Thread thread | 表示将这个 node 加入队列中的线程,在构造函数中赋值,并在Node使用完毕后会置为 null |
Node nextWaiter | 指向下一个在 condition 上等待的 Node,或者共享式的 Node |
waitStatus:
枚举 | 值 | 含义 |
---|---|---|
CANCELLED | 1 | 表示线程获取锁的请求已经被取消 |
SIGNAL | -1 | 表示线程在等待锁资源,也表示其后续的节点的线程在等待唤醒 |
CONDITION | -2 | 表示线程正在等待 condition |
PROPAGATE | -3 | 表示下一个 acquireShared 应该无条件传播 |
独占式机制
在独占锁机制中,waitStatus 只会使用到 CANCELLED 和 SIGNAL 两个状态
lock.tryLock()
会调用 sync.tryAcquire(int arg)
方法,tryAcquire()
方法会通过 cas 的方式设置 state 的值,如果设置成功则返回 true,否则说明无法获取同步状态,则返回 false
lock.lock()
方法,调用 sync.acquire(int arg)
方法
tryAcquire()
方法:尝试获取锁(修改标志位),无论成功与否立即返回acquire()
方法:获取锁(修改标志位),获取成功则返回,失败则进入队列等待,直到获取到锁
acquire
1 |
|
可见 acquire()
方法也会调用 tryAcquire()
方法先尝试获取同步状态,如果能获取到(即 tryAcquire() 返回 true),则不会走后续的 acquireQueued()
以及 selfInterrupt()
方法
如果无法获取同步状态(即 tryAcquire() 返回 false),调用 addWaiter()
方法以及 acquireQueued()
方法
addWaiter
接着看 addWaiter()
方法,顾名思义,添加一个等待节点,将新建的 Node 加入链表的队尾,并返回该节点
1 | private Node addWaiter(Node mode) { |
接着看 enq()
方法
将 node 插入到链表的队尾,并返回 node 的前序节点
在 AQS 中,FIFO 队列中的 head 是个哨兵节点,当一个入队的线程获取到锁之后,会将自己对应的节点设置为头节点 head,并将 head 中的 prev 和 thead 都置空,独占式见
acquireQueued -> setHead
,共享式见doAcquireShared->setHeadAndPropagate
1 | private Node enq(final Node node) { |
再回到 acquire()
方法中 if 中的 acquireQueued()
方法
acquireQueued
acquireQueued 方法会建立一个死循环,不断地从队列中获取独占式的线程进行处理,方法的返回值代表是否需要中断该线程
1 | final boolean acquireQueued(final Node node, int arg) { |
shouldParkAfterFailedAcquire
node
: 当前线程对应的节点pred
: 当前线程对应的节点 node 的前序节点
当一个线程尝试获取锁失败后,会调用这个方法判断是否需要挂起,方法返回值表示是否需要挂起该线程
这里讲一下 Node 中 waitStatus 这个字段的 SIGNAL 这个状态
SIGNAL 字段表示下一个节点处于挂起或者快要进行挂起的操作了,但这个状态不是节点给自己设置的,而是由后序节点修改的
在 AQS 中,当一个节点入队,说明自己需要等待锁,则会修改前序节点的 waitStatus 为 SIGNAL,代表「喂,我在你后面排队,等你处理好事情了叫醒我,我先睡会(挂起)」
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
parkAndCheckInterrupt
这个方法中通过 LockSupport.park(
) 方法将当前线程挂起,并阻塞在该行代码处,直到被唤醒响应
通过这个方法挂起的线程在被中断后,不会抛出 InterruptException 的异常
1 | private final boolean parkAndCheckInterrupt() { |
总结一下:
调用 acquire()
方法的线程首先会先尝试获取锁状态,如果获取成功,则执行后续代码,不表
主要是在多个线程竞争同一个锁时,存在竞争状态的情况下
调用 tryAcquire()
返回 false,即该线程获取锁状态不成功,则会在双向链表最后面插入一个 node 节点(持有该线程对象)
并通过 acquireQueued()
方法开启一个死循环
- 如果 node 节点的前序节点为 head 节点
说明这个 node 节点已经排队排到最前面了,可以尝试获取锁状态,获取成功后则将该 node 节点置为 head,并跳出循环(跳出循环说明该线程已经获取到锁,可以执行其需要执行的代码)
如果获取锁状态失败,则进入下一步判断该线程是否需要挂起
实际上在这个双向链表中,head 节点只是充当一个哨兵的作用,并没有其他作用
如果 node 节点的前序节点不为 head 节点,则判断该线程是否能够挂起
如果当前节点的前置节点的 waitStatus 为 Signal,说明前面的节点也在等待中,那自己理所应当的就该挂起阻塞等待了
如果当前节点的前置节点的 waitStatus 为 Canceled,则往前遍历并修改链表,跳过并删除 canceled 的节点
否则,将当前节点的前置节点的 waitStatus 置为 Signal在
shouldParkAfterFailedAcquire()
方法中,除非 pred.waitStatus == Signal 则直接返回 true,会将当前线程挂起,否则会继续循环,「将 canceled 的节点删除」或者「将前序节点的 waitStatus 置为 Signal」,当完成「将前序节点的 waitStatus 置为 Signal」这一步(即compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
) 后,下次循环如果 就会走到 pred.waitStatus == Signal 的 case 中返回 true,后续的parkAndCheckInterrupt()
方法会将该线程挂起如果一个节点是头节点的下一个节点,且头节点的线程还在占用着锁状态,则会不停地自旋去调用
tryAcquire()
方法尝试获取锁
中断机制
从前文我们知道,acquireQueued()
的返回值代表是否要中断线程,如果返回 true ,则会走到 if 的 case 中调用 selfInterrupt()
方法中断当前线程
而在 acquireQueued()
方法中,只有在 shouldParkAfterFailedAcquire()
和 parkAndCheckInterrupt()
方法都返回 true 的情况下,才会将 interrupted 的值置为 true,并在循环结束的时候返回
回看一下 parkAndCheckInterrupt()
方法,这个方法中回调用 LockSupport.park(this)
将线程挂起,由于这个方法将线程挂起时,不同于 thread.wait() 和 Thread.sleep() ,通过 LockSupport.park()
方法将线程挂起期间,不会抛出中断异常,所以在被唤醒后,需要通过 Thread.interrupted()
方法的返回值来决定是否需要中断当前线程
Thread.interrupted()
返回的是线程是否被中断过,并清除中断状态
如果线程在等待过程中被中断过(thread.interrupt()) 则 Thread.interrupted() 会返回 true ,将 acquireQueued() 方法中的 interrupted 的值修改为 true ,直到循环退出,调用 selfInterrupt() 方法将线程中断
解锁过程
release
1 | public final boolean release(int arg) { |
unparkSuccessor
unparkSuccessor()
方法是为了唤醒 node 节点的后序节点对应的线程
1 | private void unparkSuccessor(Node node) { |
共享式机制
在 AQS 中,共享式的方法都以 Shared
结尾,同样的,我们先来看 acquireShared()
方法
acquireShared
acquireShare
方法在共享模式中获取锁,会忽略掉中断,会先调用一次 tryAcquireShared 方法获取锁,成功后返回,否则,线程会进行入队等待
1 | public final void acquireShared(int arg) { |
tryacquireShared
tryacquireShared
方法中直接抛出了异常,说明这是需要共享式锁的实现类自行实现的方法
返回值为 int 值,返回值是重点所在
1 | protected int tryAcquireShared(int arg) { |
返回值 | 意义 |
---|---|
<0 | 表示尝试获取共享锁状态失败 |
0 | 表示获取共享锁状态成功,但不需要唤醒后序的处于等待共享锁的节点 |
>0 | 表示尝试获取共享锁状态成功,如果后序节点处于等待中,则需要将其唤醒 |
doAcquireShared
从上文知道,当尝试获取共享锁失败后,会调用 doAcquireShared
方法
这段代码和上文的独占式锁的机制很类似,也是将节点入队后,通过循环不断的获取队列中的线程进行处理
这里我们只看下不同的地方,即 setHeadAndPropagate
方法
1 | private void doAcquireShared(int arg) { |
setHeadAndPropagate
setHeadAndPropagate 方法用来将 node 设置为头结点,并根据 propagate 的值等条件判断是否进行传播
回顾一下独享锁,独享式锁是在 release()
方法中通过 unparkSuccessor()
唤醒后序节点起来获取锁后执行代码
而在共享式锁中,锁状态可以被多个线程所持有,所以当某个线程获取锁后,可以告知队列中的线程可以起来获取锁而不需要等到当前线程释放锁的时候再进行获取锁
当然也取决于 propagate 的值
propagate 即 tryAcquireShare() 方法的返回值
从代码中可见,当满足第一个 if 中的条件后,会获取 node.next(即头结点的后一个节点),如果其为空,即等待队列中没有等待的节点了,或者其为共享式的节点,则会调用 doReleaseShared() 方法
1 | private void setHeadAndPropagate(Node node, int propagate) { |
共享锁的释放
在看 doReleaseShared 方法前,先来看一下 tryReleaseShared
和 releaseShared
方法,有助于理解后面的 doReleaseShared 方法
tryReleaseShared
tryReleaseShared 方法需要子类自行实现
但规范了返回值代表「这次共享锁的释放是否需要唤醒后续等待的节点」,如果需要唤醒后续的节点则返回 true,否则返回 false
1 | protected boolean tryReleaseShared(int arg) { |
releaseShared
releaseShared
方法先调用 tryReleaseShared 方法尝试释放了锁,如果返回值为 true,则 调用 doReleaseShared
方法
1 | public final boolean releaseShared(int arg) { |
doReleaseShared
顾名思义,做释放共享锁的事情
这里需要明白一个事情,就是 doReleaseShared 方法可能在同一时间有多个线程在访问
因为是共享锁,所以可能有的线程正在释放锁,有的线程刚获得锁成为头节点,需要唤醒后续节点
1 | private void doReleaseShared() { |
这段代码有点难以理解
关于如何退出循环 if (h == head) 的判断
先来解释一下 if(h == head)
的这个判断,在 doReleaseShared 方法的死循环中,只有满足了这个条件才会退出循环,也就是说在 1—的这段代码执行期间,head 的引用没有发生改变,即没有新的节点成为头结点,则当前线程退出此死循环。
那什么情况下 head 会发生变化呢,答案自然是有其他的节点(可能是新加入来的节点)获取到了锁,会将 head 修改掉
举个例子,假设线程 A 调用了 doReleaseShared 方法, 此时 head == NodeA,在执行到 <1></1>
中的代码时,因为是共享锁,这时候另一个线程 B 尝试获取锁成功,将 head 修改为 NodeB,当线程 A 执行完 中间的代码后,发现 h != NodeA 了,则会继续进行循环,将此刻的头结点 head(即NodeB) 的后序节点唤醒。直到 h == head ,说明 head 在执行期间没有发生变化,说明已经完成了唤醒头节点后序节点的任务,那么就可以退出循环去做自己的事情了
也就是说这里如果有多个线程在同时执行的时候,多个线程都会帮助唤醒 head 节点的后序节点,这个思想就和 ConcurrentHashMap 中多个线程在 put 数据时,如果发现正在扩容,则会一起帮忙扩容,而不是傻傻的等待。妙啊
小结
至此,差不多将 AQS 中的重点源码都过了一遍,其他的方法大多大同小异,稍微看一下即可理解,不再赘述,读者阅读时若发现纰漏,望来信斧正,感谢
致谢
在学习 AQS 的过程中,从 B站寒食君、日拱一兵、ChiuCheng 中学到了很多知识,感谢
附上链接:
【Java并发】并发编程的意义是什么?月薪30K必知必会的Java AQS机制
Java 并发编程之 AQS