抽象同步队列AQS

2024-01-08 18:57:55

AQS——锁的底层支持

AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。

另外,大多数开发者可能永远不会直接使用AQS,但是知道其原理对于架构设计还是很有帮助的。

下面看下AQS的类图结构。
在这里插入图片描述
由该图可以看到,AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。

其中Node中的thread变量用来存放进入AQS队列里面的线程。

Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的。

EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的。

waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点)。

prev记录当前节点的前驱节点,next记录当前节点的后继节点。

在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。

对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数。

对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数。

对于semaphore来说,state用来表示当前可用信号的个数。

对于CountDownlatch来说,state用来表示计数器当前的值。

AQS有个内部类ConditionObject,用来结合锁实现线程同步。

ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。

ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。

对于AQS来说,线程同步的关键是对状态值state进行操作。

根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。

在独占方式下获取和释放资源使用的方法为:

  • void acquire(int arg)
  • void acquirelnterruptibly(int arg)
  • boolean release(int arg)

在共享方式下获取和释放资源的方法为:

  • void acquireShared(int arg)
  • void acquireSharedInterruptibly(int arg)
  • boolean releaseShared(int arg)

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。

比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。

比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。

在独占方式下,获取与释放资源的流程如下:

  1. 当一个线程调用acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。
    在这里插入图片描述
  2. 当一个线程调用release(int arg)方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread)。
    被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。

在这里插入图片描述
需要注意的是,AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。

子类在实现tryAcquire和tryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。

子类还需要定义,在调用acquire和release方法时state状态值的增减代表什么含义。

比如继承自AQS实现的独占锁ReentrantLock,定义当status为0时表示锁空闲,为1时表示锁已经被占用。

在重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false。

比如继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前state的值从1修改为0,并设置当前锁的持有者为null,然后返回true,如果CAS失败则返回false。

在共享方式下,获取与释放资源的流程如下:

  1. 当线程调用acquireShared(int arg)获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己。
    在这里插入图片描述
  2. 当一个线程调用releaseShared(int arg)时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread)。
    被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
    在这里插入图片描述
    同样需要注意的是,AQS类并没有提供可用的tryAcquireShared和tryReleaseShared方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquireShared和tryReleaseShared需要由具体的子类来实现。

子类在实现tryAcquireShared和tryReleaseShared时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。

比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryAcquireShared时,首先查看写锁是否被其他线程持有,如果是则直接返回false,否则使用CAS递增state的高16位(在ReentrantReadWriteLock中,state的高16位为获取读锁的次数)。

比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryReleaseShared时,在内部需要使用CAS算法把当前state值的高16位减1,然后返回true,如果CAS失败则返回false。

基于AQS实现的锁除了需要重写上面介绍的方法外,还需要重写isHeldExclusively方法,来判断锁是被当前线程独占还是被共享。

另外,也许你会好奇,独占方式下的void acquire(int arg)和void acquirelnterruptibly(int arg),与共享方式下的void acquireShared(int arg)和void acquireSharedInterruptibly(int arg),这两套函数中都有一个带有Interruptibly关键字的函数,那么带这个关键字和不带有什么区别呢?我们来讲讲。

其实不带Interruptibly关键字的方法的意思是不对中断进行响应,也就是线程在调用不带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。

而带Interruptibly关键字的方法要对中断进行响应,也就是线程在调用带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException异常而返回。

最后,我们来看看如何维护AQS提供的队列,主要看入队操作。

  • 入队操作:当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列。
    在这里插入图片描述
    下面结合代码和节点图来讲解入队的过程。
    在这里插入图片描述

如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如图中(default)所示。

也就是队列头、尾节点都指向null;当执行代码(1)后节点t指向了尾部节点,这时候队列状态如图所示。

这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如图中(Ⅱ)所示。

到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如图(II)所示。

然后执行代码(3)设置node的前驱节点为尾部节点,这时候队列状态如图中(IV)所示。

然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如图中(V)所示。

CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如图中(VI)所示。

AQS——条件变量的支持

notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。

它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。

在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。

在这里插入图片描述代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。

代码(2)使用创建的Lock对象的newCondition()方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。

代码(3)首先获取了独占锁,代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。

当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。

需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出java.lang.IllegalMonitorStateException异常。

代码(5)则释放了获取的锁。

其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。

调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。

调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。

经过上面解释,相信大家已经知道条件变量是什么,它是用来做什么的了。

在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。

在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。

注意这个条件队列和AQS队列不是一回事。

在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。

这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

在这里插入图片描述
在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。
在这里插入图片描述
需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。

需要由AQS的子类来提供newCondition函数。

下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。

在这里插入图片描述
代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。

注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。

如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。

这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。

当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。

最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。

在这里插入图片描述

文章来源:https://blog.csdn.net/zhuyufan1986/article/details/135460946
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。