【源码解析】从Conditon角度聊聊AQS原理

2023-12-13 23:49:18

前几篇文章,我们详细描述了线程池的原理以及核心代码,以及阻塞队列,ReentrantLock的核心流程,本篇主要介绍下lock锁中的condition

公平锁和非公平锁

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

非公平锁

    //非公平锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
             //CAS
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //公平锁 需要判断当前队列是否有线程在等待
if (compareAndSetState(0, acquires)) {} //非公平锁 不需要

非公平锁,会先进行CAS获取锁,抢到了就直接返回。

Condition

condition需要依赖于ReentrantLock,调用await 进入等待或者是singale唤醒,都需要获取锁才可以操作。

一个ReentrantLock可以创建多个condition条件。

    public Condition newCondition() {
        return sync.newCondition();
    }

		final ConditionObject newCondition() {
        return new ConditionObject();
    }

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
  		 
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }

condition核心流程其实就是两个,一个是await以及signal。
其实就是两个队列,一个是AQS本身的等待队列(双向链表),另一个就是Condition条件队列。
await() 会将已经获取lock锁的线程,释放锁,然后封装成一个Node节点,添加到条件队列尾部中。挂起。
signal() 将condition队列的头节点移动到AQS等待节点尾部,等待再次获取锁。
在这里插入图片描述

lock.lock();
await();
  // 1.释放当前lock锁,从AQS队列中删除
  // 2.将当前节点封装成Node 添加到条件队列尾部 进行阻塞。 
  // 3.当condition调用siganl的时候,删除在条件队列中的节点,转移到AQS中,等待获取锁。
  // 4.获取到锁之后,执行剩余流程,进行unlock();
lock.unlock();

在这里插入图片描述

 		//前驱节点的引用
        volatile Node prev;
        
        //后继节点的引用
        volatile Node next;
        
        //本线程
        volatile Thread thread;

        // 这个是在condition中用来构建单向链表
        Node nextWaiter;

await

		// 持有锁的线程,执行await
        //1.将当前线程封装为Node,仍到Condition的单向链表中
        //2.将锁资源释放  确认Node没有在双向链表中
        //3.挂起线程
        public final void await() throws InterruptedException {
            //判断线程是否中断 中断异常。
            if (Thread.interrupted())
                throw new InterruptedException();
            //构造一个新的等待队列Node 加入到队尾
            Node node = addConditionWaiter();
            //释放当前线程的独占锁,不管重入几次,state = 0
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //当前节点没有在同步队列上,还没有被singal 将当前线程阻塞
            // 双向链表是否在同步队列,
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 被中断直接推出自旋
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
             // 退出上面 自旋 当前节点已经在同步队列中了,
            // 但是当前节点不一定在同步队列队首, acquireQueued将阻塞直到当前节点成为队首
            // 即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

节点添加到等待队列

		//尾部方式插入到队尾
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 如果条件队列的最后一个节点取消了 清楚
            // 单向链表有值,走这个逻辑
            if (t != null && t.waitStatus != Node.CONDITION) {
                //  这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
                //  将当前Node脱离单向链表
                unlinkCancelledWaiters();
                //  更换新的
                t = lastWaiter;
            }
            // 将当前Node 封装成Node
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 当前Node是否为空,放到单线链表中
            // 这个操作持有锁的线程才能做
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

完全释放独占锁

    // 完全释放独占锁
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //获取state的值
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

等待进入阻塞队列

            while (!isOnSyncQueue(node)) {
                // 当前线程park 等待singal()
                LockSupport.park(this);
                // 被中断直接推出自旋
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
    // 判断当前节点是否在阻塞队列中
    final boolean isOnSyncQueue(Node node) {
        //  如果当前节点状态是CONDITION或node.prev是null,
        //  则证明当前节点在等待队列上而不是同步队列上。之所以可以用node.prev来判断,
        //  是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //  如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的
        if (node.next != null) // If has successor, it must be on queue
            return true;
            
        //前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是不是当前节点。
        return findNodeFromTail(node);
    }

signal

上面的操作,可以看到线程lock.lock 之后被await 阻塞了,那么就需要另外一个线程进行唤醒。

        // 将当前线程唤醒,并且需要仍到AQS双向链表中,同时断开现在存在的单向链表
        public final void signal() {
            // 当前执行singel的线程,必须是持有锁的线程
            if (!isHeldExclusively())
                // 当前执行singal的方法 没有持有锁
                throw new IllegalMonitorStateException();
            // 拿到了第一个被await的Node
            Node first = firstWaiter;
            // 如果没有first 为null , 没await 直接告辞结束
            if (first != null)
                //condition有 ,first节点进行唤醒
                doSignal(first);
        }
        // 从前往后 遍历 找到第一个需要转移的node
        private void doSignal(Node first) {
            do {
                // 判断当前Node是否是唯一的一个节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 断开存在的单向链表
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    // 将节点从条件队列转移到阻塞队列
    // true 成功转移
    // false 转移失败
    final boolean transferForSignal(Node node) {
        // 修改当前节点的状态 -2 > 0
        // node在添加的时候 是-2
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            // 修改失败
            return false;

        // sigal 将Node的state -2 -> 0
        // 执行enq方法,放入双向链表的内部 尾部
        Node p = enq(node);
        // 拿到上一个node的状态
        int ws = p.waitStatus;
        // 之前节点是取消状态
        // 节点正常 但是cas
        // 避免刚刚加入的AQS节点 无法被唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

获取独占锁

上面说了,当signal 将节点从conditon队列加入到阻塞队列中,这里的while循环就会跳出来。

			while (!isOnSyncQueue(node)) {
                // 当前线程park 等待singal()
                LockSupport.park(this);
                // 被中断直接推出自旋
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 退出上面 自旋 当前节点已经在同步队列中了,
            // 但是当前节点不一定在同步队列队首, acquireQueued将阻塞直到当前节点成为队首
            // 即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取node的prev节点
                final Node p = node.predecessor();
                // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                if (p == head && tryAcquire(arg)) {
                    // //到这里说明刚加入到等待队列里面的node只有一个,并且此时获取锁成功,设置head为node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
                // // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

小结

总的来说,Condition的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

  • 构造一个新的等待队列节点加入到等待队列队尾
  • 释放锁,也就是将它的同步队列节点从同步队列队首移除
  • 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal()) 或被中断
  • 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
    当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:

  • 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
  • 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.

文章来源:https://blog.csdn.net/jia970426/article/details/134982849
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。