AbstractQueuedSynchronized
AbstractQueuedSynchronized
简称 AQS,这个类是整个并发包的基础工具类, ReentrantLock、CountDownLatch、Semaphore、FutureTask 等并发工具类底层都是通过它来实现的
AQS 定义了两种资源共享的方式:
- Exclusive:独占式,只有一个线程能获取资源并执行,比如 ReentrantLock。
- Share:共享式,多个线程获取资源,多个线程可以同时执行,比如 CountDownLatch,ReentrantReadWriteLock 的 ReadLock 等
AQS 结构
属性
主要的就是这三个 volatile 修饰的 Node 对象,还有一些对应的偏移量(用于 CAS 的)
/** |
Node 节点
static final class Node { |
ReentrantLock 分析
我们知道 ReentrantLock 内部有两个锁,一个是公平锁 (FairSync)🔒,一个是非公平锁 (NonFairSync)🔒,这两个锁都是独占锁,两者实现的差异其实并不大,我们先从公平锁
开始说起。
static final class FairSync extends Sync { |
Lock()
🔔 这里我们为了模拟真实的情况,我们假设有两个线程Thread0
和Thread1
过来执行了Lock()
方法,且Thread0
比Thread1
要先执行。
Lock()
方法中调用了父类的acquire(1)
acquire()
public final void acquire(int arg) { |
首先尝试 tryAcquire(1),这个 tryLock() 在 AQS 中没有具体实现是交给子类去实现的,所以这里就会调用 FairSync 的,tryAquire(1)
tryAquire()
protected final boolean tryAcquire(int acquires) { |
hasQueuedPredecessors()
公平锁和非公平锁的 tryAcquire 方法区别就在这里,这个方法就是判断有没有前驱节点(不包含头节点 head,也就是)存在,有的话为了保证公平性就是需要等待,返回 true。
public final boolean hasQueuedPredecessors() { |
💡 到这里我们分析下Thread0
和Thread1
的执行情况
🔸 首先Thread0
先执行了tryAcquire(1)
没有任何阻碍,执行成功直接 retrurn
🔸 Thread1
此时有多种情况:
还没有获取 state ,
Thread0
执行完后获取 State==1 ,由于是独占锁直接 return false ,获取锁失败。已经
getState()==0
了,执行hasQueuedPredecessors
方法,注意,此时 head 和 tail 都还没有初始化,都还是 null(官方的注释中也提到 head 和 tail 是 lazily initialized )所以这里会直接 return false,然后继续执行 CAS,由于前面Thread0
已经将 state 设置为了 1 ,所以这里 CAS 肯定失败了,最终Thread1
的 tryAcquire 失败返回 false。
🔸 既然tryAcquire()
失败了,那Thread1
就会转头继续执行后面的方法
public final void acquire(int arg) { |
首先执行的就是AddWaiter()
addWaiter()
这个方法的作用就是将 Thread 和 mode 包装成 Node 然后添加到链表尾部然后返回这个 Node
private Node addWaiter(Node mode) { |
💡 根据前面分析 head 和 tail 都还没有初始化都还是 null,所以这里会直接进入 enq()
方法
enq()
private Node enq(final Node node) { |
🔸 可以看到这里是一个循环,因为 head 和 tail 都还是空的所以这里进入第一个循环,CAS 设置一个空的 Node() 为头节点 head,然后将 tail 也指向这个 head,到这里 head 和 tail 才算是初始化完成了 (lazily initialized )。
🔸 循环,进入 else,这里就将当前 node 连接到 tail 后面并且利用 CAS 自旋设置 tail 为当前 node 也就是包含Thread1
的 node,然后 return 当前节点的前驱节点(这里返回值并没有用到)。
public final void acquire(int arg) { |
下一步就是执行acquireQueued()
❓ 为什么不直接在构造器里面就初始化头节点 head?而要采用懒加载的方式?
CLH queues need a dummy header node to get started. Butwe don’t create them on construction, because it would be wasted effort if there is never contention. Instead, the nodeis constructed and head and tail pointers are set up on first contention.
以上摘自** Doug Lea** 大师的注释解释,根据我们的上面的分析,其实我们也看到了,第一个线程Thread0
过来的时候并没有去初始化 head,后面的线程Thread1
过来的时候 有了竞争才初始化了这个头节点 head,如果直接初始化这个 head,然后又没有锁竞争,这个 head 节点就被浪费了。 大师就是大师,太强了 😮
acquireQueued()
final boolean acquireQueued(final Node node, int arg) { |
💡 因为前置节点是 head,所以这里作为队列第一个可以去尝试获取锁,可以看到这里是个死循环,会一直尝试获取锁,其实类似与 CAS 的自旋,但是相比 CAS 自旋又有很大不同,它并不会一直自旋,详细可以继续往下看。
🔸 前面传递过来的 node 前继节点正好就是 head,所以执行 tryAcquire() 但是由于Thread0
还没有释放锁所以这里仍然失败。
🔸 进入第二个 if 执行 shouldParkAfterFailedAcquire(p,node)
shouldParkAfterFailedAcquire()
看名字就知道是干啥的了,获取失败是否 Park?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
注意这个里面回剔除不正常的节点,为下面的唤醒操作考虑
💡 分析Thread1
,我们先看看当前队列的状态
🔸 因为前面的操作并没有对 state 进行操作,所以这里会直接进入最后的 else,设置前驱节点的状态为 SIGNAL
然后 renturn false 回到 acquireQueued 的内循环
🔸 再次尝试获取锁,Thread0
仍然没有释放锁,失败,再次进入 shouldParkAfterFailedAcquire,这一次由于已经将前继节点的状态设置为SIGNAL
所以直接 return true,进入后面的 parkAndCheckInterrupt()
方法
此时状态变为
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { |
到这里我们的线程Thread1
就会被阻塞住,也不会继续自旋获取锁了。
LockSupport.park() 实际上调用的是 Unsafe 提供的指令属于
线程阻塞原语
,可以理解为二元信号量(只有一个 permit),这个方法也会响应 Interrupt 参考
🔔 假设此时又有一个线程Thread2
过来了,并且Thread0
依然没有释放锁
🔸 tryAcquire 失败
🔸addWaiter(), 将Thread2
和模式包装成 Node 添加到队尾(这个时候就不会进入 enq 了,因为 tail 此时为Thread1
已经不为空了)然后返回包含Thread2
的节点,队列状态变为:
🔸acquireQueued(),根据上面的状态图,这里前置节点并不是 head,直接进入 shouldParkAfterFailedAcquire()
🔸shouldParkAfterFailedAcquire(),明显前驱节点状态并不是SIGNAL
而是 0,所以直接利用 CAS 设置前驱节点为SIGNAL
状态变为:
回到 acquireQueued()
继续自旋
🔸 前置节点不是 head,调用 shouldParkAfterFailedAcquire(NodeT1,mode),成功,调用 parkAndCheckInterrupt 阻塞,至此,Thread1
,Thread2
都在这里 park 住了。
unLock()
🔔 继续上面的模拟,此时Thread0
释放锁 ,调用unlock()
方法,unlock() 会调用 AQS 中的release(1)
方法
release()
public final boolean release(int arg) { |
首先执行tryRelease(1)
,和tryAcquire()
一样,这个方法最后是交给子类去实现的
tryRelease()
protected final boolean tryRelease(int releases) { |
tryRelease() 成功继续执行后面的语句,看一下当前 AQS 队列的状态
🔸 头节点 head ! =null 并且 head.waitState 不是 0,执行 unparkSuccessor().
unparkSuccessor()
private void unparkSuccessor(Node node) { |
🔸 Thread1
被unpark()
继续执行。
private final boolean parkAndCheckInterrupt() { |
返回Thread1
的 中断标志位,并复原为 false,结束parkAndCheckInterrupt()
方法,再次回到acquireQueued()
的循环中执行第一个 if
for (;;) { |
🔸 由于Thread0
已经释放锁,同步状态已经变为 0,Thread1
可以直接tryAcquire
获取到锁,然后设置头节点为当前节点,将之前的 head 节点移除,返回中断状态,由于之前 park 期间没有被中断直接return false
,acquire 成功!!! Thread1
获得锁!!!
❓ 为什么要从后往前遍历?
这里看了一些博客介绍,大概有两个说法,一个是在
enq()
方法里面,先设置的node.prev = pred;
再执行的 CAS 最后执行的t.next = node;
CAS 成功后 next 也许还没有设置成功,从前往后遍历有可能找不到这个刚加入的节点;其次,在cancelAcquire(node);
的最后一步有一个node.next=node
的操作,如果这个时候从前往后遍历会导致死循环。
❓ 从后往前遍历找到最前面第一个 waitStatus<0 的节点,这个操作如果返回的是个中间节点怎么办?
不要怕,我们继续执行,首先它是个中间节点而且是公平锁,它有前驱节点,unpark 后肯定获取不到锁(公平锁需要检测是否有前驱节点),然后执行
shouldParkAfterFailedAcquire()
,还记得这个方法里面的一个操作么??如果前驱节点状态>0,他就会清除这些不正常的节点,返回 false,不 park 自旋,下一次循环这个节点在获取锁就可以获取到了,妙哉!!!
注意** setHead **是这样的
private void setHead(Node node) { |
此时队列状态变为:
再往后就是重复前面的过程啦。
非公平锁公平锁区别
上面是介绍的公平锁,所谓的公平就是先来后到 FIFO。
我们来看一下非公平锁的 lock 和 nonfairTryAcquire() 的实现,这两个锁的区别其实就是这两个方法。
lock()
final void lock() { |
可以看到非公平锁lock()
的时候,不管三七二十一先 CAS 试一下能不能获取到锁,获取到就直接返回
nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) { |
对比公平锁的实现,会发现少了hasQueuedPredecessors()
这个方法,所以如果前面lock()
的时候没有 CAS 成功,到这里后如果之前持有锁的线程释放了锁,它又会再次尝试 CAS 获取锁,这里其实就体现了非公平锁的特点,**先等待锁的线程不一定能先获取到锁,中间允许有人"插队"
**,如果这一次还是失败了,就会和公平锁一样老老实实去等待队列中排队
一般而言,非公平锁的性能会比公平锁好,而非公平锁可能会导致排在后面的线程饥饿
流程图
参考
《Java 并发编程之美》
一行一行源码分析清楚 AbstractQueuedSynchronizer
AbstractQueuedSynchronizer 源码剖析(六)- 深刻解析与模拟线程竞争资源
[浅谈 Java 并发编程系列(八)—— LockSupport 原理剖析]