AbstractQueuedSynchronizer

AQS (AbstractQueuedSynchronizer) 抽象类的队列式同步器

J.U.C是基于AQS实现的,AQS是一个同步器,设计模式是模板模式
核心数据结构:双向链表 + state(锁状态)
底层操作:CAS

原理概览

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

img.png

基础定义

AQS 是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效的构造出来。

为线程的同步和等待等操作提供一个基础模板类。尽可能多的实现可重入锁,读写锁同步器所有需要的功能。队列同步器内部实现了线程的同步队列,独占或是共享的获取方式等,使其只需要少量的代码便可以实现目标功能。 一般来说,AQS的子类应以其他类的内部类的形式存在,然后使用代理模式调用子类和AQS本身的方法实现线程的同步。
也就是说,使用ReentrantLock举例,外界调用ReentrantLock,ReentrantLock内部定义Sync,Sync是AQS的子类,在ReentrantLock的内部实现中调用Sync的方法,最后完成最终的功能,当然ReentrantLock内部稍复杂,又加入和公平锁和非公平锁。

抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。
AbstractQueuedSynchronizer,这个类也是在java.util.concurrent.locks下面。

AQS,全名AbstractQueuedSynchronizer,是一个抽象类的队列式同步器,它的内部通过维护一个状态volatile int state(共享资源),一个FIFO线程等待队列来实现同步功能。

state

所有通过AQS实现功能的类都是通过修改state的状态来操作线程的同步状态。比如在ReentrantLock中,一个锁中只有一个state状态,当state为0时,代表所有线程没有获取锁,当state为1时,代表有线程获取到了锁。通过是否能把state从0设置成1,当然,设置的方式是使用CAS设置,代表一个线程是否获取锁成功。

state用关键字volatile修饰,代表着该共享资源的状态一更改就能被所有线程可见,而AQS的加锁方式本质上就是多个线程在竞争state,当state为0时代表线程可以竞争锁,不为0时代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,这些线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

  • AQS内部维护一个线程的队列。队列由内部的节点组成。 队列的节点为Node,节点分为SHARED和EXCLUSIVE分别时共享模式的节点和独占模式的节点

而这个等待队列其实就相当于一个CLH队列,用一张原理图来表示大致如下:
在这里插入图片描述

AQS简介中提到了AQS内部维护着一个FIFO队列,该队列就是CLH同步队列。

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next)

AQS支持两种资源分享的方式:

  • Exclusive(独占,只有一个线程能执行,如ReentrantLock)
  • Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

自定义的同步器继承AQS后,只需要实现共享资源state的获取和释放方式即可,其他如线程队列的维护(如获取资源失败入队/唤醒出队等)等操作,AQS在顶层已经实现了,

AQS代码内部提供了一系列操作锁和线程队列的方法,主要操作锁的方法包含以下几个:

  • compareAndSetState():利用CAS的操作来设置state的值-
  • tryAcquire(int):独占方式获取锁。成功则返回true,失败则返回false。
  • tryRelease(int):独占方式释放锁。成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式获取锁。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式释放锁。如果释放后允许唤醒后续等待结点返回true,否则返回false。

ReentrantLock就是实现了自定义的tryAcquire-tryRelease,从而操作state的值来实现同步效果。

除此之外,AQS内部还定义了一个静态类Node,表示·CLH队列的每一个结点·,该结点的作用是对每一个等待获取资源做了封装,包含了需要同步的线程本身、线程等待状态…..

我们可以看下该类的一些重点变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class Node {
/** 表示共享模式下等待的Node */
static final Node SHARED = new Node();
/** 表示独占模式下等待的mode */
static final Node EXCLUSIVE = null;
/** 下面几个为waitStatus的具体值 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
/** 表示前面的结点 */
volatile Node prev;
/** 表示后面的结点 */
volatile Node next;
/**当前结点装载的线程,初始化时被创建,使用后会置空*/
volatile Thread thread;
/**链接到下一个节点的等待条件,用到Condition的时候会使用到*/
Node nextWaiter;
}

代码里面定义了一个表示当前Node结点等待状态的字段waitStatus,该字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这五个值代表了不同的特定场景:

  • CANCELLED:表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL(记住这个-1的值,因为后面我们讲的时候经常会提到)
  • CONDITION:表示结点等待在Condition上,当其他线程调用了Condition的SIGNAL()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。(注:Condition是AQS的一个组件,后面会细说)
  • PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

也就是说,当waitStatus为负值表示结点处于有效等待状态,为正值的时候表示结点已被取消

在AQS内部中还维护了两个Node对象head和tail,一开始默认都为null

1
2
private transient volatile Node head; 
private transient volatile Node tail;

讲完了AQS的一些基础定义,我们就可以开始学习同步的具体运行机制了,为了更好的演示,我们用ReentrantLock作为使用入口,一步步跟进源码探究AQS底层是如何运作的,这里说明一下,因为ReentrantLock底层调用的AQS是独占模式,所以下文讲解的AQS源码也是针对独占模式的操作

独占模式

ReentrantLocksynchronized功能类似,使用AQS的独占模式,只有一个线程可以获取锁。
在这里插入图片描述

加锁过程

我们都知道,ReentrantLock的加锁和解锁方法分别为lock()和unLock(),我们先来看获取锁的方法,

1
2
3
4
5
6
final void lock(){
if(compareAndSetState(0,1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

其中compareAndSetState(0, 1)
如果返回true就代表着之前state是0,也就是当前无线程获取锁,同时当前线程获取锁成功了,将独占线程设置为当前线程。 如果是false就代表当前有线程占用,当前占用的线程有2个可能

  • 当前线程在占用,因为是可重入锁,之后同样会获取锁
  • 其他线程在占用,在其他线程占用期间,当前线程需要等待

逻辑很简单,线程进来后直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程,否则就调用acquire(1)再次尝试获取锁。

我们假定有两个线程A和B同时竞争锁,A进来先抢占到锁,此时的AQS模型图就类似这样:
在这里插入图片描述

acquire
1
2
3
4
public final void acquire(int arg){
if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}

acquire是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。

acquire方法是一种互斥模式,且忽略中断。该方法至少执行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,则acquire直接返回,否则当前线程需要进入队列进行排队。

acquire(1)包含整个获取锁,如果获取不到就等待的操作

acquire包含了几个函数的调用,

  • tryAcquire:尝试直接获取锁,如果成功就直接返回;
  • addWaiter:获取不到锁时,说明有其他线程目前正在占用锁, 将该线程加入等待队列FIFO的尾部,并标记为独占模式;
  • acquireQueued:线程阻塞在等待队列中获取锁,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • selfInterrupt:自我中断,就是既拿不到锁,又在等待时被中断了,线程就会进行自我中断selfInterrupt(),将中断补上。

我们一个个来看源码,并结合上面的两个线程来做场景分析。

tryAcquire 就是为了再次尝试获取锁

在tryAcquire(arg)中是尝试获取锁,是由ReentrantLock提供的,逻辑比较简单

当前无线程占有锁时,即state为0时,获取锁; 当前有线程占有锁,但当前占有锁的线程是当前线程时,因为ReentrantLock是可重入锁,获取锁,并把state+1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected final boolean tryAcquire(int acquires){
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires){
final Thread current=Thread.currentThread();
int c=getState();
if(c==0){
if(compareAndSetState(0,acquires)){
setExclusiveOwnerThread(current);
return true;
}
}else if(current==getExclusiveOwnerThread()){
//当前占有锁的线程是当前线程时,因为ReentrantLock是可重入锁,获取锁,并把state+1
int nextc=c+acquires;
if(nextc< 0)
// overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

当线程B进来后,nonfairTryAcquire方法首先会获取state的值,如果为0,则正常获取该锁,不为0的话判断是否是当前线程占用了,是的话就累加state的值,这里的累加也是为了配合释放锁时候的次数,从而实现可重入锁的效果。

当然,因为之前锁已经被线程A占领了,所以这时候tryAcquire会返回false,继续下面的流程。

addWaiter

获取不到锁时,说明有其他线程目前正在占用锁,将当前线程包装成节点放入同步队列 将该线程加入等待队列FIFO的尾部,并标记为独占模式;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
先尝试快速入队,如果入队成功直接返回,如果失败(存在竞态)就使用cas反复入队直到成功为止
**/
private Node addWaiter(Node mode){
Node node=new Node(Thread.currentThread(),mode);
// Try the fast path of enq; backup to full enq on failure
// //快速入队
Node pred=tail;
if(pred!=null){
node.prev=pred;
if(compareAndSetTail(pred,node)){
pred.next=node;
return node;
}
}
enq(node);
return node;
}

这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待队列中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部,然后返回当前结点的前驱结点,

enq

用于将当前节点插入等待队列,如果队列为空,则初始化当前队列。整个过程以CAS自旋的方式进行,直到成功加入队尾为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node enq(final Node node){
// CAS"自旋",直到成功加入队尾
for(;;){
Node t=tail;
if(t==null){ // 队列为空,初始化一个Node结点作为Head结点,并将tail结点也指向它
if(compareAndSetHead(new Node()))
tail=head;
}else{ // 把当前结点插入队列尾部
node.prev=t;
if(compareAndSetTail(t,node)){
t.next=node;
return t;
}
}
}
}

第一遍循环时,tail指针为空,初始化一个Node结点,并把head和tail结点都指向它,然后第二次循环进来之后,tail结点不为空了,就将当前的结点加入到tail结点后面,也就是这样:

作者:D糊涂小天才z https://www.bilibili.com/read/cv8757383/ 出处:bilibili
如果此时有另一个线程C进来的话,发现锁已经被A拿走了,然后队列里已经有了线程B,那么线程C就只能乖乖排到线程B的后面去,
在这里插入图片描述

入队完成之后再判断一次当前是否有可能获得锁,也就是前一个节点是head的话, 前一个线程有可能已经释放了,再获取一次,如果获取成功,设置当前节点为头节点,整个获取过程完成。

acquireQueued

接着解读方法,通过tryAcquire()和addWaiter(),我们的线程还是没有拿到资源,并且还被排到了队列的尾部,如果让你来设计的话,这个时候你会怎么处理线程呢?其实答案也很简单,能做的事无非两个:

1、循环让线程再抢资源。但仔细一推敲就知道不合理,因为如果有多个线程都参与的话,你抢我也抢只会降低系统性能

2、进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源

毫无疑问,选择2更加靠谱,acquireQueued方法做的也是这样的处理:

acquireQueued()用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。该方法的实现分成两部分:如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回;否则检查当前节点是否应该被park,然后将该线程park并且检查当前线程是否被可以被中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  final boolean acquireQueued(final Node node,int arg){
boolean failed=true;
try{ // 标记是否会被中断
boolean interrupted=false; // CAS自旋
for(;;){
// 获取当前结点的前结点
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);
p.next=null;
// help GC
failed=false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())
interrupted=true;
}
}finally{
if(failed) // 获取锁失败,则将此线程对应的node的waitStatus改为CANCEL
cancelAcquire(node);
}
}

acquireQueued方法的流程是这样的:

  • 1、CAS自旋,先判断当前传入的Node的前结点是否为head结点,是的话就尝试获取锁,获取锁成功的话就把当前结点置为head,之前的head置为null(方便GC),然后返回

  • 2、如果前驱结点不是head或者加锁失败的话,就调用 shouldParkAfterFailedAcquire,将前驱节点的waitStatus变为了SIGNAL=-1,最后执行 parkAndChecknIterrupt 方法,调用 LockSupport.park()
    挂起当前线程,parkAndCheckInterrupt在挂起线程后会判断线程是否被中断,如果被中断的话,就会重新跑acquireQueued方法的CAS自旋操作,直到获取资源。

ps:LockSupport.park方法会让当前线程进入waitting状态,在这种状态下,线程被唤醒的情况有两种,一是被unpark(),二是被interrupt(),所以,如果是第二种情况的话,需要返回被中断的标志,然后在acquire顶层方法的窗口那里自我中断补上

此时,因为线程A还未释放锁,所以线程B状态都是被挂起的,

shouldParkAfterFailedAcquire(Node, Node)

shouldParkAfterFailedAcquire方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作,至于每个Node的状态表示,可以参考接口文档。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
int ws=pred.waitStatus;
if(ws==Node.SIGNAL) // 前驱结点等待状态为"SIGNAL",那么自己就可以安心等待被唤醒了
return true;
if(ws>0){ /* * 前驱结点被取消了,通过循环一直往前找,直到找到等待状态有效的结点(等待状态值小于等于0) , * 然后排在他们的后边,至于那些被当前Node强制"靠后"的结点,因为已经被取消了,也没有引用链, * 就等着被GC了 */
do{
node.prev=pred=pred.prev;
}while(pred.waitStatus>0);
pred.next=node;
}else{ // 如果前驱正常,那就把前驱的状态设置成SIGNAL
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}

在这里插入图片描述

到这里,加锁的流程就分析完了.

获取锁并等待的过程:

当lock()执行的时候:

  • 先快速获取锁,当前没有线程执行的时候直接获取锁
  • 尝试获取锁,当没有线程执行或是当前线程占用锁,可以直接获取锁
  • 将当前线程包装为node放入同步队列,设置为尾节点
  • 前一个节点如果为头节点,再次尝试获取一次锁
  • 将前一个有效节点设置为SIGNAL
  • 然后阻塞直到被唤醒

为了方便你们更加清晰理解,我加多一张流程图
在这里插入图片描述

释放锁

说完了加锁,我们来看看释放锁是怎么做的,AQS中释放锁的方法是release(),当调用该方法时会释放指定量的资源 (也就是锁) ,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

release

当ReentrantLock进行释放锁操作时,调用的是AQS的release(1)操作

1
2
3
4
5
6
7
8
9
public final boolean release(int arg){
if(tryRelease(arg)){
Node h=head;
if(h!=null&&h.waitStatus!=0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease

代码上可以看出,核心的逻辑都在tryRelease方法中,该方法的作用是释放资源,AQS里该方法没有具体的实现,需要由自定义的同步器去实现,我们看下ReentrantLock代码中对应方法的源码:

在tryRelease(arg)中会将锁释放一次,如果当前state是1,且当前线程是正在占用的线程,释放锁成功,返回true,否则因为是可重入锁,释放一次可能还在占用,应一直释放直到state为0为止

1
2
3
4
5
6
7
8
9
10
11
12
13
14

@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease方法会减去state对应的值,如果state为0,也就是已经彻底释放资源,就返回true,并且把独占的线程置为null,否则返回false。

此时AQS中的数据就会变成这样:
在这里插入图片描述

完全释放资源后,当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor(),

然后优先找下一个节点,如果取消了就从尾节点开始找,找到最前面一个可用的节点
将其取消阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   private void unparkSuccessor(Node node){
_int ws=node.waitStatus;
if(ws< 0) //将head结点的状态置为0
compareAndSetWaitStatus(node,ws,0);
//找到下一个需要唤醒的结点s
Node s=node.next;
//如果为空或已取消
if(s==null||s.waitStatus>0){
s=null;
// 从后向前,直到找到等待状态小于0的结点,前面说了,结点waitStatus小于0时才有效
for(Node t=tail;t!=null&&t!=node;t=t.prev)
if(t.waitStatus<=0)
s=t;
}
// 找到有效的结点,直接唤醒
if(s!=null)
LockSupport.unpark(s.thread);//唤醒_
}

方法的逻辑很简单,就是先将head的结点状态置为0,避免下面找结点的时候再找到head,然后找到队列中最前面的有效结点,然后唤醒,我们假设这个时候线程A已经释放锁,那么此时队列中排最前边竞争锁的线程B就会被唤醒。然后被唤醒的线程B就会尝试用CAS获取锁回到acquireQueued方法的逻辑,

阻塞在acquireQueued的地方在唤醒之后开始继续执行,当前节点已经是最前面的一个可用(未取消)节点了,经过不断的for循环以及在shouldParkAfterFailedAcquire中不断向前寻找可用节点,因此这个被唤醒的节点一定可以使其之前的节点为head。然后获取锁成功。

但是此时节点会与新加入的节点竞争,也就是不公平锁的由来。

在公平锁中,在tryAcquire时会判断之前是否有等待的节点hasQueuedPredecessors(),如果有就不会再去获取锁了,因此能保证顺序执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
for(;;){
// 获取当前结点的前结点
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);
p.next=null;
// help GC
failed=false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())
interrupted=true;
}

当线程B获取锁之后,会把当前结点赋值给head,然后原先的前驱结点 (也就是原来的head结点) 去掉引用链,方便回收,这样一来,线程B获取锁的整个过程就完成了,此时AQS的数据就会变成这样:
在这里插入图片描述
到这里,我们已经分析完了AQS独占模式下加锁和释放锁的过程,也就是tryAccquire->tryRelease这一链条的逻辑,除此之外,AQS中还支持共享模式的同步,这种模式下关于锁的操作核心其实就是tryAcquireShared->tryReleaseShared这两个方法,我们可以简单看下

共享模式

在这里插入图片描述

ReentrantReadWriteLock是Java中读写锁的实现,写写互斥,读写互斥,读读共享。读写锁在内部分为读锁和写锁,因为我们要探索共享模式,因此更关注读锁

获取锁

AQS中,共享模式获取锁的顶层入口方法是acquireShared,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源,

1
2
3
4
public final void acquireShared(int arg){
if(tryAcquireShared(arg)< 0)
doAcquireShared(arg);
}

该方法里包含了两个方法的调用,

  • tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。
  • doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。
tryAcquireShared

tryAcquireShared在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作

返回值有三种定义:

  • 负值代表获取失败; (当前有写锁,返回-1,即未获取共享锁,需要执行下一步doAcquireShared)
  • 0代表获取成功,但没有剩余的资源,也就是state已经为0;
  • 正值代表获取成功,而且state还有剩余,其他线程可以继续领取

当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
protected final int tryAcquireShared(int unused){
Thread current=Thread.currentThread();
int c=getState();
if(exclusiveCount(c)!=0&&getExclusiveOwnerThread()!=current)
return-1;
int r=sharedCount(c);
if(!readerShouldBlock()&&r<MAX_COUNT &&compareAndSetState(c,c+SHARED_UNIT)){
//设置firstReader,计算数量,略
return 1;
}
return fullTryAcquireShared(current);
}

设置共享锁需要修改state的数量,表示获取共享锁的线程的数量,当共享锁的获取存在竞争时,即compareAndSetState(c, c + SHARED_UNIT))可能设置失败,此时进入fullTryAcquireShared(current)进行获取共享锁的完整版操作。

也就是说共享锁获取时:
如果当前没有独占锁在占用,AQS根据其实现类的tryAcquireShared来实现让一个共享锁直接获取到锁(可以直接执行)
当有独占锁在占用是,让共享锁去等待直到独占锁解锁为止,也就是doAcquireShared(arg)的逻辑

doAcquireShared

此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void doAcquireShared(int arg){
// 加入队列尾部
final Node node=addWaiter(Node.SHARED);
boolean failed=true;
try{
boolean interrupted=false;
// CAS自旋
for(;;){
final Node p=node.predecessor();
// 判断前驱结点是否是head
if(p==head){
// 尝试获取一定数量的锁
int r=tryAcquireShared(arg);
if(r>=0){
// 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
setHeadAndPropagate(node,r);
// 让前驱结点去掉引用链,方便被GC
p.next=null;
// help GC
if(interrupted)
selfInterrupt();
failed=false;
return;
}
}
// 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒
if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())
interrupted=true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}

doAcquireShared(arg) 除了将线程封装成节点入队外还表达了3个思想:

  • 什么时候该执行

  • 什么时候该传播

  • 什么时候该等待(阻塞) 其中入队、执行和等待的逻辑基本和独占锁一样,

  • 入队:都是加入等待队列的末尾,成为tail节点;

  • 执行:判断当前节点的前一个节点是不是头节点,如果是的话尝试获取锁,如果获取到了就执行;

  • 等待:获取不到或前一个节点不是头节点就代表该线程需要暂时等待,直到被叫醒为止。设置前一个节点为SIGNAL状态,然后进入等待。

其中不同的就是共享锁的传播逻辑:

想象一下,当前有一个写锁正在占用,有多个读锁在等待,当写锁释放时,第二个线程也就是想要获取读锁的线程就可以获取锁了。获取到之后当前正在用的锁就是读锁了,那后面的读锁呢,因为读锁是共享的,后面的读锁应该也能够依次获取读锁,也就是读锁的传播机制。

1
2
3
4
5
6
7
8
9
10
11
   private void setHeadAndPropagate(Node node,int propagate){
Node h=head;
// head指向自己
setHead(node);
// 如果还有剩余量,继续唤醒下一个邻居线程
if(propagate>0||h==null||h.waitStatus< 0){
Node s=node.next;
if(s==null||s.isShared())
doReleaseShared();
}
}

将当前的节点设置为头节点,判断如果是共享锁,执行doReleaseShared(),唤醒当前节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 private void doReleaseShared(){
for(;;){
Node h=head;
if(h!=null&&h!=tail){
int ws=h.waitStatus;
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;
unparkSuccessor(h);
}
else if(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;
}
if(h==head)
break;
}
}

当前节点唤醒之后doAcquireShared(int arg)会继续执行,因为之前的节点被设置为头节点,如果后续是获取共享锁的节点会继续执行setHeadAndPropagate,一直传播下去直到遇到获取独占锁的节点。

看到这里,你会不会一点熟悉的感觉,这个方法的逻辑怎么跟上面那个acquireQueued() 那么类似啊?对的,其实两个流程并没有太大的差别。只是doAcquireShared()比起独占模式下的获取锁上多了一步唤醒后继线程的操作,当获取完一定的资源后,发现还有剩余的资源,就继续唤醒下一个邻居线程,这才符合”共享”的思想嘛。

这里我们可以提出一个疑问共享模式下,当前线程释放了一定数量的资源,但这部分资源满足不了下一个等待结点的需要的话,那么会怎么样?

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

接着往下说吧,唤醒下一个邻居线程的逻辑在doReleaseShared()中,我们放到下面的释放锁来解析。

共享锁的获取总结如下:
  • 尝试获取共享锁,如果当前是共享锁或无锁,设置共享锁的state,获取锁
  • 如果当前是写锁,进入等待流程
  • 入队,加入等待队列的末尾,成为tail节点
  • 判断当前节点的前一个节点是不是头节点,如果是的话尝试获取锁,如果获取到了就执行
  • 获取不到或前一个节点不是头节点就代表该线程需要暂时等待,直到被叫醒为止。设置前一个节点为SIGNAL状态,然后进入等待
  • 如果可以获取到锁,设置头节点并进入共享锁节点传播流程

释放锁

共享模式释放锁的顶层方法是releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}

该方法同样包含两部分的逻辑:

  • tryReleaseShared:释放资源。
  • doAcquireShared:唤醒后继结点。

跟tryAcquireShared方法一样,tryReleaseShared在AQS中没有具体的实现,由子同步器自己去定义,但功能都一样,就是释放一定数量的资源。

释放完资源后,线程不会马上就收工,而是唤醒等待队列里最前排的等待结点

tryReleaseShared

在tryReleaseShared(arg),基本就是tryAcquireShared(int unused)的反向操作

将设置的HoldCounter减少,firstReader设置null,state减少,将tryAcquireShared(int unused)添加的状态全部反向还原回去

当共享锁全部释放完毕,返回true,否则返回false

doAcquireShared

唤醒后继结点的工作在doReleaseShared()方法中完成,我们可以看下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doReleaseShared(){
for(;;){// 获取等待队列中的head结点
Node h=head;
if(h!=null&&h!=tail){
int ws=h.waitStatus;
// head结点waitStatus = -1,唤醒下一个结点对应的线程
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;
// loop to recheck cases
// 唤醒后继结点
unparkSuccessor(h);
}else if(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue; // loop on failed CAS
}
if(h==head) // loop if head changed
break;
}
}

代码没什么特别的,就是如果等待队列head结点的waitStatus为-1的话,就直接唤醒后继结点,唤醒的方法unparkSuccessor()在上面已经讲过了,这里也没必要再复述。

总的来看,AQS共享模式的运作流程和独占模式很相似。

2. Condition

介绍完了AQS的核心功能,我们再扩展一个知识点,在AQS中,除了提供独占/共享模式的加锁/解锁功能,它还对外提供了关于Condition的一些操作方法。

Condition是个接口,在jdk1.5版本后设计的,基本的方法就是await()和SIGNAL()方法,功能大概就对应Object的wait()和notify(),Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现
,AQS中就定义了一个类ConditionObject来实现了这个接口,
在这里插入图片描述
那么它应该怎么用呢?我们可以简单写个demo来看下效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConditionDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread tA = new Thread(() -> {
lock.lock();
try {
System.out.println("线程A加锁成功");
System.out.println("线程A执行await被挂起");
condition.await();
System.out.println("线程A被唤醒成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("线程A释放锁成功");
}
});
Thread tB = new Thread(() -> {
lock.lock();
try {
System.out.println("线程B加锁成功");
condition.SIGNAL();
System.out.println("线程B唤醒线程A");
} finally {
lock.unlock();
System.out.println("线程B释放锁成功");
}
});
tA.start();
tB.start();
}
}

执行main函数后结果输出为:

1
2
3
4
5
6
7
线程A加锁成功 
线程A执行await被挂起
线程B加锁成功
线程B唤醒线程A
线程B释放锁成功
线程A被唤醒成功
线程A释放锁成功

代码执行的结果很容易理解,线程A先获取锁,然后调用await()方法挂起当前线程并释放锁,线程B这时候拿到锁,然后调用SIGNAL唤醒线程A。

毫无疑问,这两个方法让线程的状态发生了变化,我们仔细来研究一下,

翻看AQS的源码,我们会发现Condition中定义了两个属性firstWaiter和lastWaiter,前面说了,AQS中包含了一个FIFO的CLH等待队列,每个Conditon对象就包含这样一个等待队列,而这两个属性分别表示的是等待队列中的首尾结点,

1
2
3
4
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

注意:Condition当中的等待队列和AQS主体的同步等待队列是分开的,两个队列虽然结构体相同,但是作用域是分开的

await

先看await()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
   public final void await()throws InterruptedException{
if(Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到等待队列中
Node node=addConditionWaiter();
// 完全释放占有的资源,并返回资源数
int savedState=fullyRelease(node);
int interruptMode=0;
// 循环判断当前结点是不是在Condition的队列中,是的话挂起
while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode=checkInterruptWhileWaiting(node))!=0)
break;
}
if(acquireQueued(node,savedState)&&interruptMode!=THROW_IE)
interruptMode=REINTERRUPT;
if(node.nextWaiter!=null)// clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode!=0)
reportInterruptAfterWait(interruptMode);
}

当一个线程调用 Condition.await()方法,将会以当前线程构造结点,这个结点的waitStatus赋值为Node.CONDITION, 也就是-2,并将结点从尾部加入等待队列,然后尾部结点就会指向这个新增的结点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
   private Node addConditionWaiter(){
Node t=lastWaiter;
// If lastWaiter is cancelled, clean out.
if(t!=null&&t.waitStatus!=Node.CONDITION){
unlinkCancelledWaiters();
t=lastWaiter;
}
Node node=new Node(Thread.currentThread(),Node.CONDITION);
if(t==null)
firstWaiter=node;
else
t.nextWaiter=node;
lastWaiter=node;
return node;
}

我们依然用上面的demo来演示,此时,线程A获取锁并调用Condition.await()方法后,AQS内部的数据结构会变成这样:
在这里插入图片描述
在Condition队列中插入对应的结点后,线程A会释放所持有的资源,走到while循环那层逻辑,

1
2
3
4
5
   while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode=checkInterruptWhileWaiting(node))!=0)
break;
}

isOnSyncQueue方法的会判断当前的线程节点是不是在同步队列中,这个时候此结点还在Condition队列中,所以该方法返回false,这样的话循环会一直持续下去,线程被挂起,等待被唤醒,此时,线程A的流程暂时停止了。

当线程A调用await()方法挂起的时候,线程B获取到了线程A释放的资源,然后执行SIGNAL()方法:

SIGNAL

1
2
3
4
5
6
7
   public final void SIGNAL(){
if(!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first=firstWaiter;
if(first!=null)
doSIGNAL(first);
}

先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。接着调用doSIGNAL()方法来唤醒线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   private void doSIGNAL(Node first){
// 循环,从队列一直往后找不为空的首结点
do{
if((firstWaiter=first.nextWaiter)==null)
lastWaiter=null;
first.nextWaiter=null;
}while(!transferForSIGNAL(first)&&(first=firstWaiter)!=null);`
}

final boolean transferForSIGNAL(Node node){
// CAS循环,将结点的waitStatus改为0
if(!compareAndSetWaitStatus(node,Node.CONDITION,0))
return false; // 上面已经分析过,此方法会把当前结点加入到等待队列中,并返回前驱结点
Node p=enq(node);
int ws=p.waitStatus;
if(ws>0||!compareAndSetWaitStatus(p,ws,Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

从doSIGNAL的代码中可以看出,这时候程序寻找的是Condition等待队列中首结点firstWaiter的结点,此时该结点指向的是线程A的结点,所以之后的流程作用的都是线程A的结点。

这里分析下transferForSIGNAL方法,先通过CAS自旋将结点waitStatus改为0,然后就把结点放入到同步队列 (此队列不是Condition的等待队列) 中,然后再用CAS将同步队列中该结点的前驱结点waitStatus改为Node.SIGNAL,也就是-1,此时AQS的数据结构大概如下(额…..少画了个箭头,大家就当head结点是线程A结点的前驱结点就好):
在这里插入图片描述

回到await()方法,当线程A的结点被加入同步队列中时,isOnSyncQueue()会返回true,跳出循环,

1
2
3
4
5
6
7
8
9
10
11
while(!isOnSyncQueue(node)){
LockSupport.park(this);
if((interruptMode=checkInterruptWhileWaiting(node))!=0)
break;
}
if(acquireQueued(node,savedState)&&interruptMode!=THROW_IE)
interruptMode=REINTERRUPT;
if(node.nextWaiter!=null) // clean up if cancelled
unlinkCancelledWaiters();
if(interruptMode!=0)
reportInterruptAfterWait(interruptMode);

接着执行acquireQueued()方法,这里就不用多说了吧,尝试重新获取锁,如果获取锁失败继续会被挂起,直到另外线程释放锁才被唤醒。

所以,当线程B释放完锁后,线程A被唤醒,继续尝试获取锁,至此流程结束。

对于这整个通信过程,我们可以画一张流程图展示下:

在这里插入图片描述

总结

说完了Condition的使用和底层运行机制,我们再来总结下它跟普通 wait/notify 的比较,一般这也是问的比较多的,Condition大概有以下两点优势:

Condition 需要结合 Lock 进行控制,使用的时候要注意一定要对应的unlock(),可以对多个不同条件进行控制,只要new 多个 Condition对象就可以为多个线程控制通信wait/notify 只能和 synchronized 关键字一起使用,并且只能唤醒一个或者全部的等待队列;

Condition 有类似于 await 的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是 park/unpark 的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是 wait/notify 会产生先唤醒再挂起的死锁。

无论是独占还是共享模式,或者结合是Condition工具使用,AQS本质上的同步功能都是通过对锁和队列中结点的操作来实现的

3. ReentrantLock

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解ReentrantLock的特性,我们先将ReentrantLock跟常用的Synchronized进行比较,其特性如下:
在这里插入图片描述
下面通过伪代码,进行更加直观的比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   // **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this){}
// 2.用于对象
synchronized (object){}
// 3.用于方法
public synchronized void test(){}
// 4.可重入
for(int i=0;i< 100;i++){
synchronized (this){}
}
// **************************ReentrantLock的使用方式**************************
public void test()throw Exception{
// 1.初始化选择公平锁、非公平锁
ReentrantLock lock=new ReentrantLock(true);
// 2.可用于代码块
lock.lock();
try{
try{
// 3.支持多种加锁方式,比较灵活; 具有可重入特性
if(lock.tryLock(100,TimeUnit.MILLISECONDS)){}
}finally{
// 4.手动释放锁
lock.unlock()
}
}finally{
lock.unlock();
}
}

ReentrantLock 最基本的使用方式

1
2
3
4
5
6
7
8
9
10
11
12
class X {
private final ReentrantLock lock = new ReentrantLock();

public void m() {
lock.lock();
try {
doSomething();
} finally {
lock.unlock()
}
}
}

当创建ReentrantLock时默认使用非公平锁,效率高于公平锁,暂不讨论公平锁。

ReentrantReadWriteLock的读锁的最基本的使用方式如下

1
2
3
4
5
6
7
8
9
10
11
12
class X {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public void m() {
rwl.readLock().lock();
try {
read();
} finally {
rwl.readLock().unlock();
}
}
}

2. synchronize

synchronized可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,同时它还可以保证共享变量的内存可见性。Synchronized主要有以下三个作用:保证互斥性、保证可见性、保证顺序性

3. synchronize与lock的区别

参考文章

  1. 图文并茂:AQS 是怎么运行的?
  2. AQS 简介
  3. Java技术之AQS详解
  4. Java并发之AQS详解
  5. 从ReentrantLock的实现看AQS的原理及应用

评论