【多线程及高并发 五】AQS & ReentranLock 详解

2024-01-08 11:44:00

👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列MySQL系列Redis系列Leetcode算法系列GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦??
?时间是条环形跑道,万物终将归零,亦得以圆全完美


多线程及高并发系列


AQS

AQS 核心思想:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
  • 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, AND Hagersten locks) 实现的

CLH 锁算法的核心思想是使用一个虚拟的双向链表来表示等待队列,每个等待线程都有一个对应的节点。每个节点都有一个状态标志位,用于表示线程是否已经获得锁。当线程尝试获取锁时,会通过不断自旋的方式检查前驱节点的状态,直到前驱节点释放锁

AQS 核心思想总结:

  1. 状态管理:AQS 通过一个整型的状态值来表示同步器的状态。不同的同步器可以根据自身的需求自定义状态的含义和取值范围。状态的变化和转换由AQS内部管理和控制。
  2. 独占模式和共享模式:AQS支持两种基本的同步模式,即独占模式和共享模式,意为能否被多个线程同时访问同步器
  3. 等待队列:AQS 内部维护了一个等待队列,用于管理等待获取同步器的线程。这个队列是一个FIFO队列,当线程无法获取到同步器时,会被加入到等待队列中,等待被唤醒。
  4. 状态变更和线程阻塞:当一个线程尝试获取同步器时,如果同步器的状态不符合要求(例如已被占用),线程会被置为阻塞状态,并被加入到等待队列中。当其他线程释放同步器或者发生其他特定条件时,阻塞的线程会被唤醒。
  5. CAS操作:AQS 使用CAS(Compare and Swap)操作来确保状态的原子更新。这样可以避免使用锁或其他同步机制,提高并发性能

AQS 核心数据结构 CLH 锁的详细解读可见Java AQS 核心数据结构-CLH 锁

AQS 模板方法

AQS 使用了模板方法的设计模式,使用者继承AbstractQueuedSynchronizer并重写指定的方法,可以定制化同步器的行为,适应不同的并发场景

基本方法如下:

方法名描述
protected boolean isHeldExclusively()该线程是否正在独占资源。只有用到Condition才需要去实现它。
protected boolean tryAcquire(int arg)独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。
protected boolean tryRelease(int arg)独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。
protected int tryAcquireShared(int arg)共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg)共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。

自定义同步器实现的相关方法,并通过修改State字段来实现多线程的独占模式或者共享模式

AQS 锁流程

AQS 使用int 成员变量state表示同步状态,通过内置的FIFO 线程等待/等待队列来完成获取资源线程的排队工作。本质上是 CAS + volatile

state变量由volatile修饰,用于展示当前临界资源的获锁情况

// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

状态信息 state 可以通过protected final类型的getState()、setState()和compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

//返回同步状态的当前值
protected final int getState() {
     return state;
}
 // 设置同步状态的值
protected final void setState(int newState) {
     state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 资源共享方式

AQS 提供了对资源的共享方式的支持,可以通过继承AQS来实现不同类型的共享同步器。AQS支持两种基本的资源共享方式:独占模式(Exclusive mode)共享模式(Shared mode)

  • 独占模式:在独占模式下,同一时刻只有一个线程可以获取到资源的访问权,其他线程需要等待当前线程释放资源才能获取。AQS通过tryAcquiretryRelease方法来实现独占模式的同步逻辑,如ReentrantLock
  • 共享模式:在共享模式下,多个线程可以同时获取资源的访问权。这种模式适用于多个线程可以同时读取资源的情况,但对于写操作仍然需要互斥。AQS通过tryAcquireSharedtryReleaseShared方法来实现共享模式的同步逻辑,如CountDownLatchSemaphore

AQS并不直接限制资源的共享方式,而是提供了底层的同步原语和模板方法,使开发者能够根据具体需求实现自定义的同步器。通过重写AQS的模板方法,可以实现不同类型的资源共享方式,包括独占模式和共享模式,以满足不同的并发场景和需求

AQS 源码

此段修改自AbstractQueuedSynchronizer数据结构,详细源码分析可参考此文

AbstractQueuedSynchronizer类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。

AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配

  • Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度
  • Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue

类的继承关系

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

AbstractOwnableSynchronizer抽象类的源码:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    
    // 版本序列号
    private static final long serialVersionUID = 3737899427754241961L;
    // 构造方法
    protected AbstractOwnableSynchronizer() { }
    // 独占模式下的线程
    private transient Thread exclusiveOwnerThread;
    
    // 设置独占线程 
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    // 获取独占线程 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThreadgetExclusiveOwnerThread方法,这两个方法会被子类调用。

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍

类的内部类 - Node类
static final class Node {
    // 模式,分为共享与独占
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;        
    // 结点状态
    // CANCELLED,值为1,表示当前的线程被取消
    // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
    // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
    // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
    // 值为0,表示当前节点在sync队列中,等待着获取锁
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // 结点状态
    volatile int waitStatus;        
    // 前驱结点
    volatile Node prev;    
    // 后继结点
    volatile Node next;        
    // 结点所对应的线程
    volatile Thread thread;        
    // 下一个等待者
    Node nextWaiter;
    
    // 结点是否在共享模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 获取前驱结点,若前驱结点为空,抛出异常
    final Node predecessor() throws NullPointerException {
        // 保存前驱结点
        Node p = prev; 
        if (p == null) // 前驱结点为空,抛出异常
            throw new NullPointerException();
        else // 前驱结点不为空,返回
            return p;
    }
    
    // 无参构造方法
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // 构造方法
        Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 构造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下:

结点状态结点状态值含义
CANCELLED1当前的线程被取消
SIGNAL-1当前节点的后继节点包含的线程需要运行,需要进行unpark操作
CONDITION-2当前节点在等待condition,也就是在condition queue
PROPAGATE-3当前场景下后续的acquireShared能够得以执行
-0当前节点在sync queue中,等待获取锁
类的内部类 - ConditionObject类

在 AQS 中,ConditionObject是用于支持条件变量的内部类。它实现了Condition接口,并提供了条件等待和通知的功能

ConditionObject的作用是允许线程在满足特定条件之前等待,并在条件满足时被其他线程通知。它为多线程之间的协调提供了一种机制

public interface Condition {

    // 使当前线程进入等待状态,直到接收到信号或被中断。在等待期间,当前线程会释放锁,并加入到条件等待队列中
    void await() throws InterruptedException;
    
    // 与await()类似,但是忽略中断的影响,即不会响应中断
    void awaitUninterruptibly();
    
    // 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    
    // 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    
    // 使当前线程进入等待状态,直到接收到信号、被中断或等待到指定的时间
    boolean awaitUntil(Date deadline) throws InterruptedException;
    
    // 唤醒一个等待在该条件上的线程,使其从等待状态变为可运行状态
    void signal();
    
    // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
    void signalAll();
}

ConditionObject的使用通常需要与Lock接口配合使用,通过调用LocknewCondition()方法获取一个Condition对象。**在调用条件对象的等待和通知方法之前,需要先获取相应的锁。**通过使用条件变量,线程可以更精确地控制线程的等待和唤醒,以及执行特定的操作序列

类的核心方法 - acquire

AQS(AbstractQueuedSynchronizer)中的acquire方法是用于获取同步状态(获取锁)的核心方法

  1. 调用tryAcquire方法尝试获取同步状态
  2. 如果tryAcquire方法返回失败(负值),则调用addWaiter方法将当前线程包装成一个节点(Node)并加入到等待队列中
  3. 调用acquireQueued方法使线程进入等待状态,直到获取到同步状态或被中断。acquireQueued方法会不断尝试获取同步状态,如果获取失败,则将节点加入到等待队列中,并使线程阻塞。在等待期间,线程会不断自旋尝试获取同步状态
  4. 当线程被唤醒后,会再次调用tryAcquire方法尝试获取同步状态。如果成功获取到同步状态,则调用selfInterrupt方法自我中断,以响应中断的请求
  5. 如果获取同步状态失败或被中断,则线程会继续进入循环等待,直到成功获取同步状态或抛出异常

acquire方法的伪代码:

public void acquire(int arg) {
    if (tryAcquire(arg)) {
        return;
    }
    Node node = addWaiter();
    for (;;) {
        if (acquireQueued(node, arg)) {
            selfInterrupt();
            return;
        }
    }
}

acquireQueued - 等待队列中线程出队列时机流程图

等待队列中线程出队列时机

类的核心方法 - release

AQS(AbstractQueuedSynchronizer)中的release方法是用于释放同步状态(释放锁)的核心方法。下面详细介绍了release方法的代码步骤:

  1. 调用tryRelease方法尝试释放同步状态。tryRelease方法是一个抽象方法,由子类实现具体的释放逻辑。如果tryRelease方法返回 true,表示释放同步状态成功,直接返回。
  2. 如果tryRelease方法返回 false,表示释放同步状态失败,此时会抛出IllegalMonitorStateException异常,表示当前线程未持有该同步状态
  3. 调用unparkSuccessor方法唤醒等待队列中的后继节点。unparkSuccessor方法会找到等待队列中的下一个有效节点(即状态不为取消的节点),并调用LockSupport.unpark方法唤醒该节点对应的线程

release方法的伪代码

public void release(int arg) {
    if (tryRelease(arg)) {
        unparkSuccessor(node);
    } else {
        throw new IllegalMonitorStateException();
    }
}

Lcok 框架和工具类

Lock框架和Tools类

自定义 Lock

下述代码中,SimpleLock类包装了一个内部类Sync,它继承了AbstractQueuedSynchronizerSync类重写了tryAcquiretryReleaseisHeldExclusively方法来实现锁的获取、释放和判断当前线程是否持有锁的逻辑

public class SimpleLock {
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            // 尝试获取锁,如果成功返回true,否则返回false
            return compareAndSetState(0, 1);
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 尝试释放锁,如果成功返回true,否则返回false
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            // 判断当前线程是否持有锁
            return getState() == 1;
        }

        final boolean isLocked() {
            // 是否已上锁
            return getState() != 0;
        }
    }
}

使用实例:

public class Main {
    private static final SimpleLock lock = new SimpleLock();

    public static void main(String[] args) {
        // 创建两个线程并启动
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                // 执行需要互斥的代码块
                System.out.println("Thread 1: Lock acquired");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("Thread 1: Lock released");
            }
        });

        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                // 执行需要互斥的代码块
                System.out.println("Thread 2: Lock acquired");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("Thread 2: Lock released");
            }
        });

        thread1.start();
        thread2.start();
    }
}

这只是一个简单的示例,实际上,AQS的应用要更加复杂。在实际场景中,你可能需要考虑更多的因素,例如可重入性、公平性等。如果需要在生产环境中使用锁,优先考虑使用Java标准库提供的java.util.concurrent.locks包中的锁实现,如ReentrantLock

ReentranLock

ReentrantLock 提供了可见性的保证,即当一个线程释放锁时,会将对变量的修改刷新到主内存中,以便其他线程获取锁后能够看到最新的值。

如果需要在锁范围外对成员变量的修改对其他线程可见,可以将该变量声明为volatile

公平锁/非公平锁

ReentrantLock可以以公平锁(fair lock)模式或非公平锁(nonfair lock)模式运行。这两种模式的区别在于线程获取锁的顺序。

  • 公平锁模式:在公平锁模式下,当多个线程等待获取锁时,锁将按照线程等待的先后顺序进行分配。 也就是说,等待时间最长的线程将最先获取到锁。公平锁模式确保了线程获取锁的公平性,避免了饥饿现象。但是,公平锁模式的性能相对较低,因为需要维护一个有序的等待队列。
  • 非公平锁模式:在非公平锁模式下,线程获取锁的顺序是不确定的,不一定按照等待时间的先后顺序分配。 线程有一定几率在尝试获取锁时绕过其他等待线程而直接获取到锁。非公平锁模式的性能相对较高,因为它减少了线程间切换和调度造成的开销,但可能会导致某些线程长时间等待

非公平锁尝试获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部;公平锁则

// 
static final class NonfairSync extends Sync {
    // 版本号
    private static final long serialVersionUID = 7316153563782823691L;

    // 获得锁
    final void lock() {
        if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
            // 把当前线程设置独占了锁
            setExclusiveOwnerThread(Thread.currentThread());
        else // 锁已经被占用,或者set失败
            // 以独占模式获取对象,忽略中断
            acquire(1); 
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
加锁流程

  1. 通过ReentrantLock的加锁方法lock进行加锁操作
  2. 会调用到内部类Synclock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的lock方法,本质上都会执行AQS的acquire方法
  3. AQS的acquire方法会执行tryAcquire方法,因此执行了自定义同步器ReentrantLock中的tryAcquire方法,而ReentrantLock根据锁类型不同执行不同的tryAcquire
  4. tryAcquire是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟ReentrantLock自定义同步器无关
交替输出案例
public static void main(String[] args) {
    char[] a = "ABCDE".toCharArray();
    char[] b = "12345".toCharArray();
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    new Thread(() -> {
        lock.lock();
        try {
            for (char c : a) {
                System.out.print(c);
                condition.signal();
                condition.await();
            }
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "t1").start();

    new Thread(() -> {
        lock.lock();
        try {
            for (char c : b) {
                System.out.print(c);
                condition.signal();
                condition.await();
            }
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "t2").start();
}

参考资料:

  1. Java 并发编程实战
  2. JUC锁: 锁核心类AQS详解
  3. ReentrantLock与synchronized
  4. 从ReentrantLock的实现看AQS的原理及应用
  5. Java AQS 核心数据结构-CLH 锁

文章来源:https://blog.csdn.net/why_still_confused/article/details/135351204
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。