基于ReentrantLock详解AQS源码

2023-12-13 06:02:56

  • AQS的全称是AbstractQueuedSynchronizer,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架。
  • AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子更新这个状态变量state即可以实现加锁解锁操作。

AQS成员变量:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

    protected AbstractQueuedSynchronizer() { }
    
    static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;

        // 标识线程已经取消
        static final int CANCELLED =  1;
        // 标识线程需要去unparking它的后继节点
        static final int SIGNAL    = -1;
        // 标识线程等待在一个条件上
        static final int CONDITION = -2;
        // 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
        static final int PROPAGATE = -3;

        // 当前节点保存的线程对应的等待状态(node状态可选值:0,SIGNAL,CANCELLED,CONDITION,PROPAGATE)
        // waitStatus == 0 默认状态
        // waitStatus > 0 取消状态
        // waitStatus == -1 表示当前node如果是head节点时,释放锁之后需要唤醒它的后继节点
        volatile int waitStatus;

        // 指向前驱节点
        volatile Node prev;
        // 指向后继节点
        volatile Node next;

        // 封装当前Node的线程,排队的线程
        volatile Thread thread;

        // 下一个等待在条件上的节点(Condition锁时使用)
        Node nextWaiter;

        // 当前节点是否是共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    // 队列的头节点: 任何时刻,头结点对应的线程就是当前持锁线程~
    private transient volatile Node head;

    // 队列的尾节点:(阻塞队列不包含头结点head,是从head.next 开始,到 tail 结束,这个区间是阻塞队列)~
    private transient volatile Node tail;

    // 控制加锁解锁的状态变量
    // 独占模式下:0 表示未加锁状态, >0 表示已加锁状态
    private volatile int state;
    
    // 独占模式下:表示当前持有锁的线程~
    private transient Thread exclusiveOwnerThread;
}

一、公平锁实现FairSync:

加锁:

ReentrantLock lock = new ReentrantLock();
lock.lock();
final void lock() {
    acquire(1);
}
public final void acquire(int arg) {
    //条件1:线程去尝试获取锁,获取成功则返回true,获取失败则返回false
    //条件2:
    // 	条件2.1:addWaiter方法,将当前线程封装成node入队,并且返回当前入队的节点
    // 	条件2.2:入队后调用 acquireQueued方法 (该方法包含挂起当前线程、以及线程唤醒后相关的逻辑)
    // 		   (令当前线程不断去竞争资源,直到成功获取锁才停止自旋)
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 中断当前线程
        selfInterrupt();
    }
}
// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取竞态变量的值
    int c = getState();
    // c==0,表示AQS处于无锁状态
    if (c == 0) {
        // 条件1:hasQueuedPredecessors(),判断队列中是否有等待的线程,这里是公平锁才要判断,如果是非公平锁没有这段逻辑
        // true:有等待的线程
        // false:没有等待的线程
        
        // 条件2:表示当前队列没有等待的线程,当前线程进行cas尝试获取锁,如果case设置state=1成功,说明获取到锁
        
        // 公平锁这里尝试获取锁,仅有队列为空或者队列中第一个节点是自己才进行锁的争抢,否则就要排队了
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 条件1和条件2成立,说明当前线程获取到了锁,设置当前线程为锁的持有者
            setExclusiveOwnerThread(current);
            // 返回true,说明获取到了锁
            return true;
        }
    }
    // getExclusiveOwnerThread:返回当前持有锁的线程
    // 当前线程是持有锁的线程,说明可重入了
    else if (current == getExclusiveOwnerThread()) {
        // c + acquires:重入次数+1
        int nextc = c + acquires;
        // 防止无限可重入导致nextc超过int的最大值,所以抛出异常
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 设置state值
        setState(nextc);
        // 返回true:表示当前线程获取到了锁
        return true;
    }
    // 返回false,说明当前线程没有获取到锁
    return false;
}
// 判断队列中是否有等待的线程
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    // 条件1:如果h==t:表示队列为空,返回false,没有线程等待
    // 条件2:如果h.next != null && s.thead == Thread.currentThread():表示队列不为空,当前线程是队列中等待的第一个节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
// 将当前节点加入等待队列
private Node addWaiter(Node mode) {
    // 创建一个Node节点,传入当前线程和模式,这里是独占模式
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点
    Node pred = tail;
    // 如果尾节点不为空,说明队列已经存在节点,则尝试将当前节点链接到队尾,链接成功则返回当前节点
    if (pred != null) {
        // 设置当前节点的前驱为尾节点
        node.prev = pred;
        // cas设置新的尾节点为node
        if (compareAndSetTail(pred, node)) {
            // 如果设置新的尾节点成功,则将旧的尾节点的后继指向新的尾节点,形成双向链表,此时当前节点加入队列成功
            pred.next = node;
            // 返回当前节点
            return node;
        }
    }
    // 执行到这里有以下2种情况:
    // 1.tail == null 当前队列是空队列
    // 2.cas设置当前newNode 为 tail 时失败了,被其他线程抢先一步了
    // 自旋入队,只有入队成功才结束自旋:
    enq(node);
    return node;
}
// 自旋入等待队列
private Node enq(final Node node) {
    for (;;) { //自旋
        // 获取尾节点
        Node t = tail;
        // 如果尾节点为空,说明当前队列是空的
        if (t == null) { // Must initialize
            // cas初始化head节点,为一个空节点
            if (compareAndSetHead(new Node())){
             	// 初始化成功,则头节点和尾节点相等
                tail = head;   
            }
        } else { // 尾节点不为空,说明队列中有线程在等待
            // 设置当前节点的前驱为尾节点
            node.prev = t;
            // cas设置新的尾节点为node
            if (compareAndSetTail(t, node)) {
                // 如果设置新的尾节点成功,则将旧的尾节点的后继指向新的尾节点,形成双向链表,此时当前节点加入队列成功
                t.next = node;
                // 返回当前节点
                return t;
            }
        }
    }
}
// 抢占锁+挂起线程+线程唤醒之后执行的地方
final boolean acquireQueued(final Node node, int arg) {
    // false:表示当前线程抢占锁没有发生异常
    // true:表示当前线程抢占锁发生异常,需要将当前节点出队
    boolean failed = true;
    try {
        // 当前线程是否被中断唤醒的,因为执行到parkAndCheckInterrupt时会挂起线程,之后会被别的线程唤醒,做一个标记
        boolean interrupted = false;
        // 自旋进行锁抢占
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 条件1:p == head,即当前节点的前驱节点是否是头节点
            // true: 说明当前节点是头节点的第一个后继节点,有资格去争抢锁
            // false: 说明当前节点不是头节点的第一个后继节点,还是老老实实排队吧,等有资格再说
         	
            // 条件2:tryAcquire(arg),尝试去获取锁
         
            if (p == head && tryAcquire(arg)) {
                // 抢到锁了,则将head指向下一个节点,并把oldHead置空
                // node为头节点,清空节点的内容为一个空节点
                setHead(node);
                // 将旧的头节点的后继设置为null
                p.next = null; // help GC
                // 当前线程抢占锁成功,没有发生异常
                failed = false;
                // 当前线程没有被中断 false
                return interrupted;
            }
            // shouldParkAfterFailedAcquire:获取锁失败,需要被挂起  
            // 第一次进入方法,一般不会挂起,会设置pred.waitStatus=-1,自旋第二次进入就会被挂起
            // true:需要被挂起
            // false: 不需要挂起
            
            // parkAndCheckInterrupt:挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()){
                // 设置中断状态 true,表示当前线程是被中断信号唤醒的
                interrupted = true;
            }
        }
    } finally {
        // 抢占锁出现异常,需要将当前节点从队列中移除
        if (failed){
         	cancelAcquire(node);   
        }
    }
}
// 设置头节点,并且置空头节点的线程和前置指针
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
// 判断当前线程是否需要被挂起 true:需要挂起  false:不需要挂起
// pred:前驱节点
// node:当前线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // ws==-1:前驱节点释放锁之后回去唤醒下一个节点
    if (ws == Node.SIGNAL)
        // 挂起当前线程
        return true;
    // ws>0:表示前驱节点处于取消状态
    if (ws > 0) {
        // 链表从后往前走,跳过已经取消的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 直到遇到ws==0:表示默认状态,然后把pred的后驱指向当前node
        // 本次调用,node不需要被挂起,下次重试进来就会被挂起
        pred.next = node;
    } else {
        // 此时ws一定是0,但是不立即挂起它
        // cas将pred.waitStatus设置为-1
        // 下次重试进来时当前线程会被挂起
        
        // 将当前node节点的前一个节点的waitStatus设置为-1,表示前一个节点释放锁的时候会唤醒当前节点
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 不需要被挂起
    return false;
}
// 挂起当前线程,并返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
// 将node从队列中取消掉
private void cancelAcquire(Node node) {
    // 当前node为空,则忽略
    if (node == null) return;
	
    // 当前node线程置空
    node.thread = null;

    // 从后往前找,跳过已经取消的Node节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext:表示不是取消节点的下一个节点,即指向第一个取消的节点
    Node predNext = pred.next;

    // 设置当前节点为已取消状态
    node.waitStatus = Node.CANCELLED;

    // 如果取消的是最后一个节点,则将尾节点cas设置为最后一个未取消的节点,跳过已取消的节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // cas把pred的next节点设置为null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 走到else代表当前节点不是tail节点,或者是cas操作的时候tail发生了变化
        // 如果不是tail节点,不能直接把tail节点指向到上面while循环得出的prev节点
        int ws;
        // 这里是的if代码块,是为了尝试一次,如果不成功再去复杂的处理。
        // 这里的if判断条件如下:
            // 1.如果上面while循环得到的pred节点不是head节点
            // 2.如果上面while循环得到的pred节点为-1,如果不为-1,cas改变成-1也。
            // 3.如果上面while循环得到的pred节点的线程指向不为null(如果为null代表在取消的过程中)
        // 因为&&是拼接,所以上面任意一个条件为false就会进入到else条件中。
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            
            Node next = node.next;
            // 当前节点的下一个节点如果不为空,且下一个节点的状态为-1,说明node是一个中间节点,需要将node后面的节点链接起来
            if (next != null && next.waitStatus <= 0)
                // cas将pred.next与next连接起来
                compareAndSetNext(pred, predNext, next);
        } else {
            // 直接唤醒当前节点的下一个节点。
            // 唤醒的目的是为了去执行shouldParkAfterFailedAcquire方法去处理取消节点。
            unparkSuccessor(node);
        }
		// 把当前节点的下一个节点指向自己
        node.next = node; // help GC
    }
}
// 唤醒当前节点的下一个节点,node为pred前驱节点
private void unparkSuccessor(Node node) {
    // 将node.waitStatus设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //s为node节点的下一个节点
    Node s = node.next;
    // 下一个节点为空或者下一个节点已经取消
    // 条件1:s == null
    // s 什么时候为null?
    // 1.当前节点就是tail节点时,s==null
    // 2.当前节点入队未完成时(1.设置新节点的prev指向pred  2.CAS设置新节点为tail  3.(未完成)pred.next -> 新节点)
    // 需要找到可以被唤醒的节点...
    
    // 条件2:s.waitStatus > 0 前提是 s == null
    // 如果条件2成立,则说明当前node节点的后继节点是取消状态,需要找一个合适的可以被唤醒的节点...
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从队尾往前找,找到离队头最近的第一个正常节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果s不为空,说明找到了一个可以唤醒的节点,则进行唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁:

lock.unlock();
public void unlock() {
     sync.release(1);
}
public final boolean release(int arg) {
    // 尝试释放锁,true:释放锁成功  false:释放锁失败
    if (tryRelease(arg)) {
        // 判断头节点是否是正常节点,如果head是正常节点则唤醒头节点的下一个节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒操作
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 尝试释放锁
protected final boolean tryRelease(int releases) {
    // 获取state-1
    int c = getState() - releases;
    // 不是当前线程释放锁,则报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 如果c==0,说明是可重入最后一次释放锁,可以释放锁
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置state=0,说明AQS属于无锁状态
    setState(c);
    return free;
}

至此,ReentrantLock的公平锁实现方式源码已经注释完毕!


二、非公平锁实现NonfairSync:

公平锁的实现,在加锁时,只要队列中有等待的线程,就需要排队进行等待,很公平。

对于非公平锁,在加锁的时候会先执行cas抢占锁的逻辑,此时线程会跟头节点的下一个节点进行抢占锁,体现非公平的表现。

final void lock() {
    // 非公平锁加锁直接先cas抢一波,抢不到再进行入队操作
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
public final void acquire(int arg) {
    // tryAcquire尝试获取锁,非公平锁和公平锁的实现不一样
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// 非公平锁
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 公平锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 差别在于有无hasQueuedPredecessors方法,这是判断队列中是否有等待的线程
// 公平锁需要判断,进行有序排队,而非公平锁不需要排队,直接先抢占一波,抢占不到再入队

三、图解案例:

下面用一幅图来描述三个线程加锁的流程,线程一加锁成功,线程二,三加锁失败挂起入队,然后线程一释放锁,唤醒线程二,然后线程二释放锁,唤醒线程三:

1.线程一抢到锁成功,设置Sate=1,并且设置持有锁线程为线程一
在这里插入图片描述
2.线程二加锁失败,由于头节点不存在,所以线程二在入队时会顺便创建头节点,然后再链接到头节点后边,并且设置前一个节点的waitStatus=-1,表示前一个节点释放锁后会唤醒后一个节点,然后挂起线程二:
在这里插入图片描述
3.线程三加锁失败,则入队挂起:
在这里插入图片描述
4.线程一释放锁,设置头节点的waitStatus=0,然后唤醒下一个节点进行抢占锁,如果线程二抢到了锁:
在这里插入图片描述
接着将线程二的节点作为头节点,head指针指向这个节点,并且断开原来的头节点:
在这里插入图片描述
5.线程二释放锁,唤醒下一个节点进行抢占锁,线程三抢占到了锁,断开原来头节点,重新设置新的头节点:
在这里插入图片描述6.线程三释放锁,把AQS中的state设置为0,清空持有锁线程,队列中的节点还是存在一个头节点,这个节点的内容都是空的,只要头节点创建出来了就一直存在,持有锁的线程会关联头节点。

7.上述流程可以说是公平锁的抢占方式,每次都是队列头节点的下一个节点抢到锁,严格排队抢锁,如果是非公平锁,则在唤醒head的下一个节点进行抢锁时,其他线程也可以在此期间抢锁,如果被其他线程抢到锁,则head的下一个节点还是会重新挂起,然后新的抢占锁的线程会重新关联head节点。


参考:

https://zhuanlan.zhihu.com/p/463668014

https://csp1999.blog.csdn.net/article/details/116604866

https://xiaohuang.blog.csdn.net/article/details/130046506

文章来源:https://blog.csdn.net/Linging_24/article/details/134961383
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。