解密AQS实现

2023-12-24 07:13:13

AQS

什么是锁?

可重入锁理论
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的的内层方法会自动获取锁(前提是锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。

Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

将字分开解释:

可:可以

重:再次

入:进入

锁:同步锁

进入什么? - 进入同步域(即同步代码块/方法或显示锁锁定的代码)

一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。

自己可以获取自己的内部锁。

可重入锁的种类:

隐式锁(即synchronized关键字使用的锁)默认是可重入锁。

同步块
同步方法

Synchronized的重入的实现机理。

显式锁(即Lock)也有ReentrantLock这样的可重入锁。可重入锁的种类:

  • 隐式锁(即synchronized关键字使用的锁)默认是可重入锁。

    • 同步块
    • 同步方法
    public class ReentrantLockDemo2 {
        Object object = new Object();
        public void sychronizedMethod(){
       new Thread(()->{
           synchronized (object){
               System.out.println(Thread.currentThread().getName()+"\t"+"外层....");
               synchronized (object){
                   System.out.println(Thread.currentThread().getName()+"\t"+"中层....");
                   synchronized (object){
                       System.out.println(Thread.currentThread().getName()+"\t"+"内层....");
                   }
               }
           }
       },"Thread A").start();
    }
    
        public static void main(String[] args) {
            new ReentrantLockDemo2().sychronizedMethod();
        }
       }
    
Thread A	外层....
Thread A	中层....
Thread A	内层....

public class ReentrantLockDemo2 {

    public static void main(String[] args) {
        new ReentrantLockDemo2().m1();
        
    }
    
    public synchronized void m1() {
    	System.out.println("===外");
    	m2();
    }
    
    public synchronized void m2() {
    	System.out.println("===中");
    	m3();
    }
    
    public synchronized void m3() {
    	System.out.println("===内");
    	
    }
}

=========

Synchronized的重入的实现机理

每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。

当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。

在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。

当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。

显式锁(即Lock)也有ReentrantLock这样的可重入锁

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Phone2 implements Runnable{
    Lock lock = new ReentrantLock();
    /**
     * set进去的时候,就加锁,调用set方法的时候,能否访问另外一个加锁的set方法
     */
    public void getLock() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t get Lock");
            setLock();
        } finally {
            lock.unlock();
        }
    }
    public void setLock() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t set Lock");
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void run() {
        getLock();
    }
}

public class ReentrantLockDemo {
    public static void main(String[] args) {
        Phone2 phone = new Phone2();
        /**
         * 因为Phone实现了Runnable接口
         */
        Thread t3 = new Thread(phone, "t3");
        Thread t4 = new Thread(phone, "t4");
        t3.start();
        t4.start();
    }
}

t3	 get Lock
t3	 set Lock
t4	 get Lock
t4	 set Lock


LockSupport是什么

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park()和 unpark()的作用分别是阻塞线程和解除阻塞线程。
总之,比wait/notify,await/signal更强。
3种让线程等待和唤醒的方法

方式1:使用Object中的wait()方法让线程等待,使用object中的notify()方法唤醒线程
方式2:使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程
方式3:LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程

waitNotify限制

Object类中的wait和notify方法实现线程等待和唤醒

public class WaitNotifyDemo {

	static Object lock = new Object();
	
	public static void main(String[] args) {
		new Thread(()->{
			synchronized (lock) {
				System.out.println(Thread.currentThread().getName()+" come in.");
				try {
					lock.wait();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			System.out.println(Thread.currentThread().getName()+" 换醒.");
		}, "Thread A").start();
		
		new Thread(()->{
			synchronized (lock) {
				lock.notify();
				System.out.println(Thread.currentThread().getName()+" 通知.");
			}
		}, "Thread B").start();
	}
}

wait和notify方法必须要在同步块或者方法里面且成对出现使用,否则会抛出java.lang.IllegalMonitorStateException。

调用顺序要先wait后notify才OK。

awaitSignal限制

Condition接口中的await后signal方法实现线程的等待和唤醒,与Object类中的wait和notify方法实现线程等待和唤醒类似

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionAwaitSignalDemo {
	public static void main(String[] args) {
		ReentrantLock lock = new ReentrantLock();
		Condition condition = lock.newCondition();
		new Thread(()->{
			try {
				System.out.println(Thread.currentThread().getName()+" come in.");
				lock.lock();
				condition.await();				
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
			System.out.println(Thread.currentThread().getName()+" 换醒.");
		},"Thread A").start();
		new Thread(()->{
			try {
				lock.lock();
				condition.signal();
				System.out.println(Thread.currentThread().getName()+" 通知.");
			}finally {
				lock.unlock();
			}
		},"Thread B").start();
	}
	
}


Thread A come in.
Thread B 通知.
Thread A 换醒.

await和signal方法必须要在同步块或者方法里面且成对出现使用,否则会抛出java.lang.IllegalMonitorStateException。

调用顺序要先await后signal才OK。

LockSupport方法介绍

传统的synchronized和Lock实现等待唤醒通知的约束

线程先要获得并持有锁,必须在锁块(synchronized或lock)中
必须要先等待后唤醒,线程才能够被唤醒
LockSupport类中的park等待和unpark唤醒

Basic thread blocking primitives for creating locks and other
synchronization classes. This class associates, with each thread that
uses it, a permit (in the sense of the Semaphore class). A call to
park will return immediately if the permit is available, consuming it
in the process; otherwise it may block. A call to unpark makes the
permit available, if it was not already available. (Unlike with
Semaphores though, permits do not accumulate. There is at most one.)

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零。
可以把许可看成是一种(0.1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
通过park()和unpark(thread)方法来实现阻塞和唤醒线程的操作 park()/park(Object blocker) -
阻塞当前线程阻塞传入的具体线程

public class LockSupport {
    public static void park() {
        UNSAFE.park(false, 0L);
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    } 
}

permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回。
unpark(Thread thread) - 唤醒处于阻塞状态的指定线程

public class LockSupport {
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
}

  • 调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,pemit值还是1)会自动唤醒thead线程,即之前阻塞中的LockSupport.park()方法会立即返回。
LockSupport解析
public class LockSupportDemo {
	public static void main(String[] args) {
		Thread a = new Thread(()->{
//			try {
//				TimeUnit.SECONDS.sleep(2);
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
			System.out.println(Thread.currentThread().getName() + " come in. " + System.currentTimeMillis());
			LockSupport.park();
			System.out.println(Thread.currentThread().getName() + " 换醒. " + System.currentTimeMillis());
		}, "Thread A");
		a.start();	
		Thread b = new Thread(()->{
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			LockSupport.unpark(a);
			System.out.println(Thread.currentThread().getName()+" 通知.");
		}, "Thread B");
		b.start();
	}
	
}


输出结果:

Thread A come in.
Thread B 通知.
Thread A 换醒.

正常 + 无锁块要求。
先前错误的先唤醒后等待顺序,LockSupport可无视这顺序。

重点说明

  • LockSupport是用来创建锁和共他同步类的基本线程阻塞原语。

  • LockSuport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻寨之后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe中的native代码。

  • LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞的过程

  • LockSupport和每个使用它的线程都有一个许可(permit)关联。permit相当于1,0的开关,默认是0,

  • 调用一次unpark就加1变成1, 调用一次park会消费permit,也就是将1变成0,同时park立即返回。

如再次调用park会变成阻塞(因为permit为零了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累凭证。

形象的理解

线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。

  • 当调用park方法时
  • 如果有凭证,则会直接消耗掉这个凭证然后正常退出。
  • 如果无凭证,就必须阻塞等待凭证可用。
  • 而unpark则相反,它会增加一个凭证,但凭证最多只能有1个,累加无放。

面试题

为什么可以先唤醒线程后阻塞线程?

因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。

为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?

因为凭证的数量最多为1(不能累加),连续调用两次 unpark和调用一次
unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。

AQS理论初步

是什么?AbstractQueuedSynchronizer 抽象队列同步器。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
}

是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。

在这里插入图片描述

CLH:Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。

AQS能干嘛

AQS为什么是JUC内容中最重要的基石?

和AQS有关的
在这里插入图片描述
在这里插入图片描述

进一步理解锁和同步器的关系

锁,面向锁的使用者 - 定义了程序员和锁交互的使用层APl,隐藏了实现细节,你调用即可

同步器,面向锁的实现者 - 比如Java并发大神DougLee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
能干嘛?
加锁会导致阻塞 - 有阻塞就需要排队,实现排队必然需要有某种形式的队列来进行管理
解释说明:

抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupportpark)的方式,维护state变量的状态,使并发达到同步的控制效果。

在这里插入图片描述

AQS源码体系 -1

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.

AbstractQueuedSynchronizer (Java Platform SE 8 )

  • 提供一个框架来实现阻塞锁和依赖先进先出(FIFO)等待队列的相关同步器(信号量、事件等)。此类被设计为大多数类型的同步器的有用基础,这些同步器依赖于单个原子“int”值来表示状态。子类必须定义更改此状态的受保护方法,以及定义此状态在获取或释放此对象方面的含义。给定这些,这个类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但是只有使用方法getState()、setState(int)和compareAndSetState(int,int)操作的原子更新的’int’值在同步方面被跟踪。

有阻塞就需要排队,实现排队必然需要队列

  • AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFo队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node,节点来实现锁的分配,通过CAS完成对State值的修改。
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

     * Creates a new {@code AbstractQueuedSynchronizer} instance
    protected AbstractQueuedSynchronizer() { }

     * Wait queue node class.
    static final class Node {

     * Head of the wait queue, lazily initialized.  Except for
    private transient volatile Node head;

     * Tail of the wait queue, lazily initialized.  Modified only via
    private transient volatile Node tail;

     * The synchronization state.
    private volatile int state;

     * Returns the current value of synchronization state.
    protected final int getState() {

     * Sets the value of synchronization state.
    protected final void setState(int newState) {

     * Atomically sets synchronization state to the given updated
    protected final boolean compareAndSetState(int expect, int update) {
         
    ...
}         

AQS源码体系 -2

AQS自身

AQS的int变量 - AQS的同步状态state成员变量

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
     * The synchronization state.
    private volatile int state;
    ...
}

state成员变量相当于银行办理业务的受理窗口状态。

  • 零就是没人,自由状态可以办理
  • 大于等于1,有人占用窗口,等着去
    AQS的CLH队列
  • CLH队列(三个大牛的名字组成),为一个双向队列
  • 银行候客区的等待顾客

The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used forspinlocks. We instead use them for blocking synchronizers, butuse the same basic tactic of holding some of the controlinformation about a thread in the predecessor of its node. A"status" field in each node keeps track of whether a threadshould block. A node is signalled when its predecessorreleases. Each node of the queue otherwise serves as aspecific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads aregranted locks etc though. A thread may try to acquire if it isfirst in the queue. But being first does not guarantee success;it only gives the right to contend. So the currently releasedcontender thread may need to rewait.

To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field. 本段文字出自AbstractQueuedSynchronizer内部类Node源码注释

等待队列是“CLH”(Craig、Landin和Hagersten)锁队列的变体。CLH锁通常用于旋转锁。相反,我们使用它们来阻止同步器,但是使用相同的基本策略,即在其节点的前一个线程中保存一些关于该线程的控制信息。每个节点中的“status”字段跟踪线程是否应该阻塞。当一个节点的前一个节点释放时,它会发出信号。否则,队列的每个节点都充当一个特定的通知样式监视器,其中包含一个等待线程。状态字段并不控制线程是否被授予锁等。如果线程是队列中的第一个线程,它可能会尝试获取。但是,第一并不能保证成功,它只会给人争取的权利。因此,当前发布的内容线程可能需要重新等待。

要排队进入CLH锁,您可以将其作为新的尾部进行原子拼接。要出列,只需设置head字段。

小总结

  • 有阻塞就需要排队,实现排队必然需要队列
  • state变量+CLH变种的双端队列

AbstractQueuedSynchronizer内部类Node源码

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...

     * Creates a new {@code AbstractQueuedSynchronizer} instance
    protected AbstractQueuedSynchronizer() { }

     * Wait queue node class.
    static final class Node {
        //表示线程以共享的模式等待锁
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        
        //表示线程正在以独占的方式等待锁
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        //线程被取消了
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;

        //后继线程需要唤醒
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        
        //等待condition唤醒
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        
        //共享式同步状态获取将会无条件地传播下去
        * waitStatus value to indicate the next acquireShared should     
        static final int PROPAGATE = -3;

        //当前节点在队列中的状态(重点)
        //说人话:
        //等候区其它顾客(其它线程)的等待状态
        //队列中每个排队的个体就是一个Node
        //初始为0,状态上面的几种
         * Status field, taking on only the values:
        volatile int waitStatus;

        //前驱节点(重点)
         * Link to predecessor node that current node/thread relies on
        volatile Node prev;

        //后继节点(重点)
         * Link to the successor node that the current node/thread
        volatile Node next;

        //表示处于该节点的线程
         * The thread that enqueued this node.  Initialized on
        volatile Thread thread;

        //指向下一个处于CONDITION状态的节点
         * Link to next node waiting on condition, or the special
        Node nextWaiter;

         * Returns true if node is waiting in shared mode.
        final boolean isShared() {

        //返回前驱节点,没有的话抛出npe
         * Returns previous node, or throws NullPointerException if null.
        final Node predecessor() throws NullPointerException {

        Node() {    // Used to establish initial head or SHARED marker

        Node(Thread thread, Node mode) {     // Used by addWaiter

        Node(Thread thread, int waitStatus) { // Used by Condition
    }
	...
}

AQS同步队列的基本结构

AQS源码深度解读 -1

从ReentrantLock开始解读AQS

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。

在这里插入图片描述

 * A reentrant mutual exclusion {@link Lock} with the same basic
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

     * Base of synchronization control for this lock. Subclassed
    abstract static class Sync extends AbstractQueuedSynchronizer {

     * Sync object for non-fair locks
    static final class NonfairSync extends Sync {

     * Sync object for fair locks
    static final class FairSync extends Sync {

     * Creates an instance of {@code ReentrantLock}.
    public ReentrantLock() {
        sync = new NonfairSync();
    }

     * Creates an instance of {@code ReentrantLock} with the
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

     * Acquires the lock.
    public void lock() {
        sync.lock();//<------------------------注意,我们从这里入手
    }
        
    * Attempts to release this lock.
    public void unlock() {
        sync.release(1);
    }
    ...
}

从最简单的lock方法开始看看公平和非公平,先浏览下AbstractQueuedSynchronizer,FairSync,NonfairSync类的源码

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

	...

     * Acquires in exclusive mode, ignoring interrupts.  Implemented
    public final void acquire(int arg) {//公平锁或非公平锁都会调用这方法
        if (!tryAcquire(arg) &&//0.
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//1. 2.
            selfInterrupt();//3.
    }
    
    //0.
    * Attempts to acquire in exclusive mode. This method should query
    protected boolean tryAcquire(int arg) {//取决于公平锁或非公平锁的实现
        throw new UnsupportedOperationException();
    }
	
    
    //1.
    * Acquires in exclusive uninterruptible mode for thread already in
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //2.
    * Creates and enqueues node for current thread and given mode.
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    //3.
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    //这个方法将会被公平锁的tryAcquire()调用
    * Queries whether any threads have been waiting to acquire longer
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    
	...         
}

public class ReentrantLock implements Lock, java.io.Serializable {
    
    ...
    
    //非公平锁与公平锁的公共父类
     * Base of synchronization control for this lock. Subclassed
    abstract static class Sync extends AbstractQueuedSynchronizer {
    
    	...
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        } 
        ...
    
    }
        
    //非公平锁
	static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {//<---ReentrantLock初始化为非公平锁时,ReentrantLock.lock()将会调用这
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);//调用父类AbstractQueuedSynchronizer的acquire()
        }

        //acquire()将会间接调用该方法
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);//调用父类Sync的nonfairTryAcquire()
        }
    }
    
    * Sync object for fair locks
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {//<---ReentrantLock初始化为非公平锁时,ReentrantLock.lock()将会调用这
            acquire(1);调用父类AbstractQueuedSynchronizeracquire()
        }

        //acquire()将会间接调用该方法
         * Fair version of tryAcquire.  Don't grant access unless
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&//<---公平锁与非公平锁的唯一区别,公平锁调用hasQueuedPredecessors(),而非公平锁没有调用
                    							//hasQueuedPredecessors()在父类AbstractQueuedSynchronizer定义
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    
	...

}

可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法

对比公平锁和非公平锁的tyAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()

hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)

在这里插入图片描述

接下来讲述非公平锁的lock()
整个ReentrantLock 的加锁过程,可以分为三个阶段:

  1. 尝试加锁;
  2. 加锁失败,线程入队列;
  3. 线程入队列后,进入阻赛状态。

AQS源码深度解读 - 2

ReentrantLock的示例程序

带入一个银行办理业务的案例来模拟我们的AQS 如何进行线程的管理和通知唤醒机制,3个线程模拟3个来银行网点,受理窗口办理业务的顾客。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class AQSDemo {
	
	public static void main(String[] args) {
		ReentrantLock lock = new ReentrantLock();
		
		//带入一个银行办理业务的案例来模拟我们的AQs 如何进行线程的管理和通知唤醒机制
		//3个线程模拟3个来银行网点,受理窗口办理业务的顾客

		//A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
				try {
					TimeUnit.SECONDS.sleep(5);//模拟办理业务时间
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			} finally {
				lock.unlock();
			}
		}, "Thread A").start();
		
		//第2个顾客,第2个线程---->,由于受理业务的窗口只有一个(只能一个线程持有锁),此代B只能等待,
		//进入候客区
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
			} finally {
				lock.unlock();
			}
		}, "Thread B").start();
		
		
		//第3个顾客,第3个线程---->,由于受理业务的窗口只有一个(只能一个线程持有锁),此代C只能等待,
		//进入候客区
		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName() + " come in.");
				
			} finally {
				lock.unlock();
			}
		}, "Thread C").start();
	}
}


程序初始状态方便理解图

在这里插入图片描述

启动程序,首先是运行线程A,ReentrantLock默认是选用非公平锁。

public class ReentrantLock implements Lock, java.io.Serializable {
    
    ...
        
    * Acquires the lock.
    public void lock() {
        sync.lock();//<------------------------注意,我们从这里入手,一开始将线程A的
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        ...

        //被NonfairSync的tryAcquire()调用
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        ...

    }
    
    
	//非公平锁
	static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {//<----线程A的lock.lock()调用该方法
            if (compareAndSetState(0, 1))//AbstractQueuedSynchronizer的方法,刚开始这方法返回true
                setExclusiveOwnerThread(Thread.currentThread());//设置独占的所有者线程,显然一开始是线程A
            else
                acquire(1);//稍后紧接着的线程B将会调用该方法。
        }

        //acquire()将会间接调用该方法
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);//调用父类Sync的nonfairTryAcquire()
        }
        

        
    }
    
    ...
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /**
     * The synchronization state.
     */
    private volatile int state;

    //线程A将state设为1,下图红色椭圆区
    /*Atomically sets synchronization state to the given updated value 
    if the current state value equals the expected value.
    This operation has memory semantics of a volatile read and write.*/
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

线程 A 开始执行任务。。。

在这里插入图片描述

AQS源码深度解读 - 3

线程B运行。。。

public class ReentrantLock implements Lock, java.io.Serializable {
    * Acquires the lock.
    public void lock() {
        sync.lock();//<------------------------注意,我们从这里入手,线程B的执行这
	//非公平锁
	static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {//<-------------------------线程B的lock.lock()调用该方法
            if (compareAndSetState(0, 1))//这是预定线程A还在工作,这里返回false
                setExclusiveOwnerThread(Thread.currentThread());//
            else
                acquire(1);//线程B将会调用该方法,该方法在AbstractQueuedSynchronizer,
            			   //它会调用本类的tryAcquire()方法
        }
        //acquire()将会间接调用该方法
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);//调用父类Sync的nonfairTryAcquire()
        }
    }

    //非公平锁与公平锁的公共父类
     * Base of synchronization control for this lock. Subclassed
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //acquire()将会间接调用该方法
    	...
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//这里是线程B
            int c = getState();//线程A还在工作,c=>1
            if (c == 0) {//false
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//(线程B == 线程A) => false
                int nextc = c + acquires;//+1
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;//最终返回false
        } 
    
    }
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&//线程B调用非公平锁的tryAcquire(), 最终返回false,加上!,也就是true,也就是还要执行下面两行语句
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//下一节论述
            selfInterrupt();
    }
}

假设线程B,C还没启动,正在工作线程A重新尝试获得锁,也就是调用lock.lock()多一次
    //非公平锁与公平锁的公共父类fa
     * Base of synchronization control for this lock. Subclassed
    abstract static class Sync extends AbstractQueuedSynchronizer {
    
    	...
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//这里是线程A
            int c = getState();//线程A还在工作,c=>1;如果线程A恰好运行到在这工作完了,c=>0,这时它又要申请锁的话
            if (c == 0) {//线程A正在工作为false;如果线程A恰好工作完,c=>0,这时它又要申请锁的话,则为true
                if (compareAndSetState(0, acquires)) {//线程A重新获得锁
                    setExclusiveOwnerThread(current);//这里相当于NonfairSync.lock()另一重设置吧!
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//(线程A == 线程A) => true
                int nextc = c + acquires;//1+1=>nextc=2
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);//state=2,说明要unlock多两次吧(现在盲猜)
                return true;//返回true
            }
            return false;
        } 
        ...
    
    }

_AQS源码深度解读 - 4

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

	...

     * Acquires in exclusive mode, ignoring interrupts.  Implemented
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&//线程B调用非公平锁的tryAcquire(), 最终返回false,加上!,也就是true,也就是还要执行下面两行语句
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//线程B加入等待队列
            selfInterrupt();//下一节论述
    }
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {//根据上面一句注释,本语句块的意义是将新节点快速添加至队尾
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//快速添加至队尾失败,则用这方法调用(可能链表为空,才调用该方法)
        return node;
    }
    
    //Inserts node into queue, initializing if necessary.
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))//插入一个哨兵节点(或称傀儡节点)
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//真正插入我们需要的节点,也就是包含线程B引用的节点
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    //CAS head field. Used only by enq.
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    //CAS tail field. Used only by enq.
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    
    ...
}

在这里插入图片描述

线程B进入等待队列

AQS源码深度解读 - 5

线程A依然工作,线程C如线程B那样炮制加入等待队列。

在这里插入图片描述

双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。

AQS源码深度解读 - 6

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

	...

     * Acquires in exclusive mode, ignoring interrupts.  Implemented
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&//线程B调用非公平锁的tryAcquire(), 最终返回false,加上!,也就是true,也就是还要执行下面两行语句
            //线程B加入等待队列,acquireQueued本节论述<--------------------------
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//
    }
    
    //Acquires in exclusive uninterruptible mode for thread already inqueue. 
    //Used by condition wait methods as well as acquire.
    //
    //return true if interrupted while waiting
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//1.返回前一节点,对与线程B来说,p也就是傀儡节点
				//p==head为true,tryAcquire()方法说明请转至 #21_AQS源码深度解读03
                //假设线程A正在工作,现在线程B只能等待,所以tryAcquire(arg)返回false,下面的if语块不执行
                //
                //第二次循环,假设线程A继续正在工作,下面的if语块还是不执行
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //请移步到2.处的shouldParkAfterFailedAcquire()解说。第一次返回false, 下一次(第二次)循环
                //第二次循环,shouldParkAfterFailedAcquire()返回true,执行parkAndCheckInterrupt()
                if (shouldParkAfterFailedAcquire(p, node) && 
                    //4. 
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    
    static final class Node {

        ...
        //1.返回前一节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        
        ...

    }
    
    //2. 
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//此时pred指向傀儡节点,它的waitStatus为0
        //Node.SIGNAL为-1,跳过
        //第二次调用,ws为-1,条件成立,返回true
        if (ws == Node.SIGNAL)//-1
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {//跳过
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //3. 傀儡节点的WaitStatus设置为-1//下图红圈
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;//第一次返回
    }
    
    /**
     * CAS waitStatus field of a node.
     */
    //3.
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
    
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    //4.
    private final boolean parkAndCheckInterrupt() {
        //前段章节讲述的LockSupport,this指的是NonfairSync对象,
        //这意味着真正阻塞线程B,同样地阻塞了线程C
        LockSupport.park(this);//线程B,C在此处暂停了运行<-------------------------
        return Thread.interrupted();
    }
    
}

在这里插入图片描述

图中的傀儡节点的waitStatus由0变为-1(Node.SIGNAL)。

接下来讨论ReentrantLock.unLock()方法。假设线程A工作结束,调用unLock(),释放锁占用。

public class ReentrantLock implements Lock, java.io.Serializable {
    
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        ...
        //2.unlock()间接调用本方法,releases传入1
        protected final boolean tryRelease(int releases) {
            //3.
            int c = getState() - releases;//c为0
            //4.
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {//c为0,条件为ture,执行if语句块
                free = true;
                //5.
                setExclusiveOwnerThread(null);
            }
            //6.
            setState(c);
            return free;//最后返回true
        }
    	...
    
    }
    
    static final class NonfairSync extends Sync {...}
    
    public ReentrantLock() {
        sync = new NonfairSync();//我们使用的非公平锁
    }
    					//注意!注意!注意!
    public void unlock() {//<----------从这开始,假设线程A工作结束,调用unLock(),释放锁占用
        //1.
        sync.release(1);//在AbstractQueuedSynchronizer类定义
    }
    
    ...
 
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...
    //1.
    public final boolean release(int arg) {
        //2.
        if (tryRelease(arg)) {//该方法看子类NonfairSync实现,最后返回true
            Node h = head;//返回傀儡节点
            if (h != null && h.waitStatus != 0)//傀儡节点非空,且状态为-1,条件为true,执行if语句
                //7.
                unparkSuccessor(h);
            return true;
        }
        return false;//返回true,false都无所谓了,unlock方法只是简单调用release方法,对返回结果没要求
    }
    
    /**
     * The synchronization state.
     */
    private volatile int state;

    //3.
    protected final int getState() {
        return state;
    }

    //6.
    protected final void setState(int newState) {
        state = newState;
    }
    
    //7. Wakes up node's successor, if one exists.
    //传入傀儡节点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;//傀儡节点waitStatus为-1
        if (ws < 0)//ws为-1,条件成立,执行if语块
            compareAndSetWaitStatus(node, ws, 0);//8.将傀儡节点waitStatus由-1变为0

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;//傀儡节点的下一节点,也就是带有线程B的节点
        if (s == null || s.waitStatus > 0) {//s非空,s.waitStatus非0,条件为false,不执行if语块
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)//s非空,条件为true,不执行if语块
            LockSupport.unpark(s.thread);//唤醒线程B。运行到这里,线程A的工作基本告一段落了。
    }
    
    //8.
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
    
    
}

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    ...

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    //5.
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    //4.
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

线程A结束工作,调用unlock()的tryRelease()后的状态,state由1变为0,exclusiveOwnerThread由线程A变为null。

在这里插入图片描述

线程B被唤醒,即从原先park()的方法继续运行

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

     private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//线程B从阻塞到非阻塞,继续执行
        return Thread.interrupted();//线程B没有被中断,返回false
    }
    
	...
 
    //Acquires in exclusive uninterruptible mode for thread already inqueue. 
    //Used by condition wait methods as well as acquire.
    //
    //return true if interrupted while waiting
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//线程B所在的节点的前一节点是傀儡节点
                //傀儡节点是头节点,tryAcquire()的说明请移步至#21_AQS源码深度解读03
                //tryAcquire()返回true,线程B成功上位
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//1.将附带线程B的节点的变成新的傀儡节点
                    p.next = null; // help GC//置空原傀儡指针与新的傀儡节点之间的前后驱指针,方便GC回收
                    failed = false;
                    return interrupted;//返回false,跳到2.acquire()
                }
               
                if (shouldParkAfterFailedAcquire(p, node) && 
                    //唤醒线程B继续工作,parkAndCheckInterrupt()返回false
                    //if语块不执行,跳到下一循环
                    parkAndCheckInterrupt())//<---------------------------------唤醒线程在这里继续运行
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //1. 
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    //2.
    * Acquires in exclusive mode, ignoring interrupts.  Implemented
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            //acquireQueued()返回fasle,条件为false,if语块不执行,acquire()返回
            //也就是说,线程B成功获得锁,可以展开线程B自己的工作了。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//
    }
    
}

最后,线程B上位成功。

在这里插入图片描述

文章来源:https://blog.csdn.net/liu_lucky/article/details/135171560
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。