【多线程及高并发 五】AQS & ReentranLock 详解
👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦??
?时间是条环形跑道,万物终将归零,亦得以圆全完美

AQS & ReentranLock 详解
多线程及高并发系列
AQS
AQS 核心思想:
- 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
- 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, AND Hagersten locks) 实现的
CLH 锁算法的核心思想是使用一个虚拟的双向链表来表示等待队列,每个等待线程都有一个对应的节点。每个节点都有一个状态标志位,用于表示线程是否已经获得锁。当线程尝试获取锁时,会通过不断自旋的方式检查前驱节点的状态,直到前驱节点释放锁
AQS 核心思想总结:
- 状态管理:AQS 通过一个整型的状态值来表示同步器的状态。不同的同步器可以根据自身的需求自定义状态的含义和取值范围。状态的变化和转换由AQS内部管理和控制。
- 独占模式和共享模式:AQS支持两种基本的同步模式,即独占模式和共享模式,意为能否被多个线程同时访问同步器
- 等待队列:AQS 内部维护了一个等待队列,用于管理等待获取同步器的线程。这个队列是一个FIFO队列,当线程无法获取到同步器时,会被加入到等待队列中,等待被唤醒。
- 状态变更和线程阻塞:当一个线程尝试获取同步器时,如果同步器的状态不符合要求(例如已被占用),线程会被置为阻塞状态,并被加入到等待队列中。当其他线程释放同步器或者发生其他特定条件时,阻塞的线程会被唤醒。
- CAS操作:AQS 使用
CAS(Compare and Swap)操作来确保状态的原子更新。这样可以避免使用锁或其他同步机制,提高并发性能
AQS 核心数据结构 CLH 锁的详细解读可见Java AQS 核心数据结构-CLH 锁
AQS 模板方法
AQS 使用了模板方法的设计模式,使用者继承AbstractQueuedSynchronizer并重写指定的方法,可以定制化同步器的行为,适应不同的并发场景
基本方法如下:
| 方法名 | 描述 |
|---|---|
| protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
| protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
| protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
| protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
自定义同步器实现的相关方法,并通过修改State字段来实现多线程的独占模式或者共享模式
AQS 锁流程

AQS 使用int 成员变量state表示同步状态,通过内置的FIFO 线程等待/等待队列来完成获取资源线程的排队工作。本质上是 CAS + volatile
state变量由volatile修饰,用于展示当前临界资源的获锁情况
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
状态信息 state 可以通过protected final类型的getState()、setState()和compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 资源共享方式
AQS 提供了对资源的共享方式的支持,可以通过继承AQS来实现不同类型的共享同步器。AQS支持两种基本的资源共享方式:独占模式(Exclusive mode)和共享模式(Shared mode)
- 独占模式:在独占模式下,同一时刻只有一个线程可以获取到资源的访问权,其他线程需要等待当前线程释放资源才能获取。AQS通过
tryAcquire和tryRelease方法来实现独占模式的同步逻辑,如ReentrantLock - 共享模式:在共享模式下,多个线程可以同时获取资源的访问权。这种模式适用于多个线程可以同时读取资源的情况,但对于写操作仍然需要互斥。AQS通过
tryAcquireShared和tryReleaseShared方法来实现共享模式的同步逻辑,如CountDownLatch和Semaphore
AQS并不直接限制资源的共享方式,而是提供了底层的同步原语和模板方法,使开发者能够根据具体需求实现自定义的同步器。通过重写AQS的模板方法,可以实现不同类型的资源共享方式,包括独占模式和共享模式,以满足不同的并发场景和需求
AQS 源码
此段修改自AbstractQueuedSynchronizer数据结构,详细源码分析可参考此文
AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。
AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配
Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue

类的继承关系
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
AbstractOwnableSynchronizer抽象类的源码:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 构造方法
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;
// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 获取独占线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍
类的内部类 - Node类
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造方法
Node() { // Used to establish initial head or SHARED marker
}
// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下:
| 结点状态 | 结点状态值 | 含义 |
|---|---|---|
CANCELLED | 1 | 当前的线程被取消 |
SIGNAL | -1 | 当前节点的后继节点包含的线程需要运行,需要进行unpark操作 |
CONDITION | -2 | 当前节点在等待condition,也就是在condition queue中 |
PROPAGATE | -3 | 当前场景下后续的acquireShared能够得以执行 |
| - | 0 | 当前节点在sync queue中,等待获取锁 |
类的内部类 - ConditionObject类
在 AQS 中,
ConditionObject是用于支持条件变量的内部类。它实现了Condition接口,并提供了条件等待和通知的功能
ConditionObject的作用是允许线程在满足特定条件之前等待,并在条件满足时被其他线程通知。它为多线程之间的协调提供了一种机制
public interface Condition {
// 使当前线程进入等待状态,直到接收到信号或被中断。在等待期间,当前线程会释放锁,并加入到条件等待队列中
void await() throws InterruptedException;
// 与await()类似,但是忽略中断的影响,即不会响应中断
void awaitUninterruptibly();
// 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 使当前线程进入等待状态,直到接收到信号、被中断或等待到指定的时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在该条件上的线程,使其从等待状态变为可运行状态
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
ConditionObject的使用通常需要与Lock接口配合使用,通过调用Lock的newCondition()方法获取一个Condition对象。**在调用条件对象的等待和通知方法之前,需要先获取相应的锁。**通过使用条件变量,线程可以更精确地控制线程的等待和唤醒,以及执行特定的操作序列
类的核心方法 - acquire

AQS(AbstractQueuedSynchronizer)中的acquire方法是用于获取同步状态(获取锁)的核心方法
- 调用
tryAcquire方法尝试获取同步状态 - 如果
tryAcquire方法返回失败(负值),则调用addWaiter方法将当前线程包装成一个节点(Node)并加入到等待队列中 - 调用
acquireQueued方法使线程进入等待状态,直到获取到同步状态或被中断。acquireQueued方法会不断尝试获取同步状态,如果获取失败,则将节点加入到等待队列中,并使线程阻塞。在等待期间,线程会不断自旋尝试获取同步状态 - 当线程被唤醒后,会再次调用
tryAcquire方法尝试获取同步状态。如果成功获取到同步状态,则调用selfInterrupt方法自我中断,以响应中断的请求 - 如果获取同步状态失败或被中断,则线程会继续进入循环等待,直到成功获取同步状态或抛出异常
acquire方法的伪代码:
public void acquire(int arg) {
if (tryAcquire(arg)) {
return;
}
Node node = addWaiter();
for (;;) {
if (acquireQueued(node, arg)) {
selfInterrupt();
return;
}
}
}
acquireQueued - 等待队列中线程出队列时机流程图

类的核心方法 - release
AQS(AbstractQueuedSynchronizer)中的release方法是用于释放同步状态(释放锁)的核心方法。下面详细介绍了release方法的代码步骤:
- 调用
tryRelease方法尝试释放同步状态。tryRelease方法是一个抽象方法,由子类实现具体的释放逻辑。如果tryRelease方法返回 true,表示释放同步状态成功,直接返回。 - 如果
tryRelease方法返回 false,表示释放同步状态失败,此时会抛出IllegalMonitorStateException异常,表示当前线程未持有该同步状态 - 调用
unparkSuccessor方法唤醒等待队列中的后继节点。unparkSuccessor方法会找到等待队列中的下一个有效节点(即状态不为取消的节点),并调用LockSupport.unpark方法唤醒该节点对应的线程
release方法的伪代码
public void release(int arg) {
if (tryRelease(arg)) {
unparkSuccessor(node);
} else {
throw new IllegalMonitorStateException();
}
}
Lcok 框架和工具类

自定义 Lock
下述代码中,SimpleLock类包装了一个内部类Sync,它继承了AbstractQueuedSynchronizer。Sync类重写了tryAcquire、tryRelease和isHeldExclusively方法来实现锁的获取、释放和判断当前线程是否持有锁的逻辑
public class SimpleLock {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 尝试获取锁,如果成功返回true,否则返回false
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
// 尝试释放锁,如果成功返回true,否则返回false
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
// 判断当前线程是否持有锁
return getState() == 1;
}
final boolean isLocked() {
// 是否已上锁
return getState() != 0;
}
}
}
使用实例:
public class Main {
private static final SimpleLock lock = new SimpleLock();
public static void main(String[] args) {
// 创建两个线程并启动
Thread thread1 = new Thread(() -> {
lock.lock();
try {
// 执行需要互斥的代码块
System.out.println("Thread 1: Lock acquired");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1: Lock released");
}
});
Thread thread2 = new Thread(() -> {
lock.lock();
try {
// 执行需要互斥的代码块
System.out.println("Thread 2: Lock acquired");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 2: Lock released");
}
});
thread1.start();
thread2.start();
}
}
这只是一个简单的示例,实际上,AQS的应用要更加复杂。在实际场景中,你可能需要考虑更多的因素,例如可重入性、公平性等。如果需要在生产环境中使用锁,优先考虑使用Java标准库提供的java.util.concurrent.locks包中的锁实现,如ReentrantLock
ReentranLock
ReentrantLock 提供了可见性的保证,即当一个线程释放锁时,会将对变量的修改刷新到主内存中,以便其他线程获取锁后能够看到最新的值。
如果需要在锁范围外对成员变量的修改对其他线程可见,可以将该变量声明为
volatile

公平锁/非公平锁
ReentrantLock可以以公平锁(fair lock)模式或非公平锁(nonfair lock)模式运行。这两种模式的区别在于线程获取锁的顺序。
- 公平锁模式:在公平锁模式下,当多个线程等待获取锁时,锁将按照线程等待的先后顺序进行分配。 也就是说,等待时间最长的线程将最先获取到锁。公平锁模式确保了线程获取锁的公平性,避免了饥饿现象。但是,公平锁模式的性能相对较低,因为需要维护一个有序的等待队列。
- 非公平锁模式:在非公平锁模式下,线程获取锁的顺序是不确定的,不一定按照等待时间的先后顺序分配。 线程有一定几率在尝试获取锁时绕过其他等待线程而直接获取到锁。非公平锁模式的性能相对较高,因为它减少了线程间切换和调度造成的开销,但可能会导致某些线程长时间等待
非公平锁尝试获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部;公平锁则
//
static final class NonfairSync extends Sync {
// 版本号
private static final long serialVersionUID = 7316153563782823691L;
// 获得锁
final void lock() {
if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
// 把当前线程设置独占了锁
setExclusiveOwnerThread(Thread.currentThread());
else // 锁已经被占用,或者set失败
// 以独占模式获取对象,忽略中断
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
加锁流程

- 通过
ReentrantLock的加锁方法lock进行加锁操作 - 会调用到内部类
Sync的lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的lock方法,本质上都会执行AQS的acquire方法 - AQS的
acquire方法会执行tryAcquire方法,因此执行了自定义同步器ReentrantLock中的tryAcquire方法,而ReentrantLock根据锁类型不同执行不同的tryAcquire tryAcquire是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟ReentrantLock自定义同步器无关
交替输出案例
public static void main(String[] args) {
char[] a = "ABCDE".toCharArray();
char[] b = "12345".toCharArray();
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
for (char c : a) {
System.out.print(c);
condition.signal();
condition.await();
}
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
for (char c : b) {
System.out.print(c);
condition.signal();
condition.await();
}
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
参考资料:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!