首页 > 基础资料 博客日记
深入源码解析 ReentrantLock、AQS:掌握 Java 并发编程关键技术
2023-07-24 10:16:42基础资料围观301次
前言
介绍 ReentrantLock、AQS 之前,先分析它们的来源,来自于 JUC 中的核心组件,java.util.concurrent 在并发编程中是比较会常用的工具类,里面包含了很多在并发场景下使用的组件,比如:线程池 > ThreadPoolExecutor、阻塞队列 > BlockingQueue、计数器 > CountDownLatch、循环屏障 > CyclicBarrier、信号量 > Semaphore、并发集合 > ConcurrentHashMap | CopyOnWriteArrayList |ConcurrentSkipListMap 等
ReentrantLock 与 synchronized 具有相同的基本行为、语义,但它扩展了一些其他的功能且更能灵活控制锁
1、ReentrantLock 提供了公平锁、非公平锁的机制,而 synchronized 并没有公平锁的机制
2、ReentrantLock 提供了 tryLock 方法,尝试获取锁而不会阻塞线程去作其他的事情,更加灵活
3、ReentrantLock#lockInterruptibly 方法提供了响应中断的能力,若当前在等待锁的线程被中断了,通过此方法可以捕获到中断异常,以便作相应的异常处理
4、ReentrantLock > tryLock(long time, TimeUnit unit) 方法提供了锁超时等待能力,可以指定等待锁的超时时间,对于限时等待的场景很有用
5、ReentrantLock 可以通过 newCondition 方法获取多个 Condition 对象来实现多个条件变量,以便可以更加细粒度地调用 await、signal 等待、唤醒操作;synchronized 只能通过 wait、notify 方法实现简单的等待和唤醒
在该篇博文主要介绍 ReentrantLock 是如何实现的,以及它的核心方法源码,如何结合 AQS 实现锁解决并发安全问题的
Lock
Lock 是 JUC 组件下最核心的接口,绝大部分组件都使用到了 Lock 接口,所以先以 Lock 接口作为切入点讲解后续的源码
Lock 本质上是一个接口,它提供了获得锁、释放锁、条件变量、锁中断能力,定义为接口就意味着它定义了一个锁的标准规范,也同时意味着锁的不同实现。实现 Lock 接口的类有很多,以下为几个常见的锁实现
- ReentrantLock:表示为重入锁,它是唯一一个实现了 Lock 接口的类;重入锁是指当前线程获得锁以后,再次获取锁不需要进行阻塞,而是直接累加 AbstractQueuedSynchronizer#state 变量值
- ReentrantReadWriteLock:表示重入读写锁,实现了 ReadWriteLock 接口,在该类中维护了两种锁,一个是 ReadLock,另外一个是 WriteLock,它们各自实现了 Lock 接口。
读写锁是一种适合读多写少的场景下,来解决线程安全问题的组件,基本的原则:读读不互斥、读写互斥、写写互斥,一旦涉及到数据变化的操作都会是互斥的
- StampedLock:该锁是 JDK 8 引入的锁机制,是读写锁的一个改进版本,读写锁虽然通过分离读、写功能使得读、读之间可以并行,但是读、写是互斥的,若大量的读线程存在,可能会引起写线程的饥饿;StampedLock 是一种乐观锁的读策略,采用 CAS 乐观锁完全不会阻塞写线程
重要的方法,简介如下:
- lock:若锁可用就获得锁,若锁不可用就阻塞,直接锁被释放
- lockInterruptibly:与 lock 方法相似,但阻塞的线程可中断,会抛出
java.lang.InterruptedException 异常
- tryLock:非阻塞获取锁,尝试获取锁,若成功返回 true
- tryLock(long timeout, TimeUnit timeUnit):带有超时时间的获取锁方法
- unLock:释放锁
重入锁
重入锁,支持同一个线程在同一个时刻获取同一把锁;也就是说,若当前线程 T1 调用 lock 方法获取了锁以后,再次调用 lock,是不会再以阻塞的方式去获取锁的,直接增加锁的重入次数就 OK 了。
synchronized、ReentrantLock 都支持重入锁,存在多个加锁的方法相互调用时,其实就是一种锁可重入特性的场景,以下通过不同的代码案例来演示可重入锁是怎样的
synchronized
/**
* @author vnjohn
* @since 2023/6/17
*/
public class SynchronizedDemo {
public synchronized void lockMethodOne() {
System.out.println("begin:lockMethodOne");
lockMethodTwo();
}
public void lockMethodTwo() {
synchronized (this) {
System.out.println("begin:lockMethodTwo");
}
}
public static void main(String[] args) {
SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
new Thread(() -> synchronizedDemo.lockMethodOne()).start();
}
}
调用 lockMethodOne 方法获取了当前实例的锁,然后在这个方法里面还调用了 lockMethodTwo 方法,lockMethodTwo 虽然是代码块锁,但锁住的也是当前实例;若不支持锁可重入时,当前线程会因为无法获取 lockMethodTwo 实例锁而被阻塞,即会发生死锁现象,重入锁设计的目的是为了避免线程的死锁
ReentrantLock
ReentrantLock 与 synchronized 同理,示例代码如下:
/**
* @author vnjohn
* @since 2023/6/17
*/
public class ReentrantLockDemo {
static Lock lock = new ReentrantLock();
public void lockMethodOne() {
lock.lock();
try {
System.out.println("begin:lockMethodOne");
lockMethodTwo();
} finally {
lock.unlock();
}
}
public void lockMethodTwo() {
lock.lock();
try {
System.out.println("begin:lockMethodTwo");
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
new Thread(()-> reentrantLockDemo.lockMethodOne()).start();
}
}
ReentrantReadWriteLock 读写锁
上面提及到的 synchronized、ReentrantLock 重入锁的特性其实是排它锁,也是悲观锁,该锁在同一时刻只允许一个线程进行访问,而读写锁在同一个时刻可以允许多个线程(读)访问,但是在写线程访问时,所有的读线程、其他写线程都会被阻塞。读写锁维护了一对锁:读锁 > ReentrantReadWriteLock.ReadLock、写锁 > ReentrantReadWriteLock.WriteLock;一般情况下,读写锁的性能会比悲观锁性能好,因为在大多数场景下读都是多于写的,读写锁能够比排它锁提供更好的并发性、吞吐量;通过案例来演示读写锁如何使用,如下:
/**
* @author vnjohn
* @since 2023/6/18
*/
public class ReentrantReadWriteLockDemo {
private static Map<String, Object> CACHE_MAP = new HashMap<>();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
/**
* 通过读锁从本地缓存中获取数据
*
* @param key
* @return
*/
public static Object get(String key) {
readLock.lock();
try {
System.out.println("本地缓存读取数据:" + key);
TimeUnit.SECONDS.sleep(1);
return CACHE_MAP.get(key);
} catch (InterruptedException e) {
return null;
} finally {
readLock.unlock();
}
}
/**
* 通过写锁从本地缓存中获取数据
*
* @param key
* @return
*/
public static Object put(String key, Object obj) {
writeLock.lock();
try {
System.out.println("本地缓存写入数据:" + key);
TimeUnit.SECONDS.sleep(1);
return CACHE_MAP.put(key, obj);
} catch (InterruptedException e) {
return null;
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
String keyOnce = "thread-batch-once";
for (int i = 0; i < 5; i++) {
// 演示读写锁互斥的情况,
new Thread(()-> ReentrantReadWriteLockDemo.get(keyOnce)).start();
new Thread(()-> ReentrantReadWriteLockDemo.put(keyOnce, Thread.currentThread().getName())).start();
}
}
}
在该案例中,通过 HashMap 来模拟了一个本地缓存,然后使用读写锁来保证这个本地缓存线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会阻塞,因为读操作不会影响执行结果
在执行写操作时,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性
读锁、读锁可以共享
读锁、写锁不可以共享(排它)
写锁、写锁不可以共享(排它)
ReentrantLock 实现原理
锁的基本原理是,将多线程并行任务基于某一种机制实现线程的串行执行,从而达到线程安全性的目的。在 synchronize 中,存在锁升级的概念 > 偏向锁、轻量级锁、重量级锁。基于 CAS 乐观自旋锁优化了 synchronize 加锁开销,同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争、同步的目的。那么在 ReentrantLock 中,也一定会存在这样的问题需要去解决
那么在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?提及这个必须先说说 AQS 是什么了!
AQS
AQS > 全称 AbstractQueuedSynchronizer,内部用到了一个同步等待队列,它是一个同步工具也是 Lock 用来实现线程同步的核心组件
从 AQS 功能、使用层面来说,AQS 分为两种:独占、共享
- 独占锁:同一时刻只能有一个线程持有锁,操作、写入资源,比如:ReentrantLock
- 共享锁:允许多个线程同时获取锁,并发访问共享资源,比如:ReentrantReadWriteLock
AQS 内部实现
AQS 内部的同步等待队列其实就是维护了一个 FIFO 的双向链表,这种结构的特点是每个节点都会两个指针,分别指向直接后继节点、直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱、后继节点。节点由内部类 Node 表示,Node 内部类其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 AQS 队列中去;当获取锁的线程释放锁以后,会从队列中唤醒其中一个阻塞的节点(线程)
Node 内部结构
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** 线程已取消等待锁,调用 tryLock(TimeUnit) 或 intercept 中断方法*/
static final int CANCELLED = 1;
/** 表明后续线程需要被唤醒 */
static final int SIGNAL = -1;
/** 表明线程在等待状态 */
static final int CONDITION = -2;
/**
* 在共享模式下,该值表明下一个需要被分享的节点应该无条件被分享
*/
static final int PROPAGATE = -3;
/**
* 0-默认值、CANCELLED、SIGNAL、CONDITION、PROPAGATE,
* 后续会通过 CAS 操作改变该值状态
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 当前线程
*/
volatile Thread thread;
/**
* 存储在 Condition 队列中的后继节点
*/
Node nextWaiter;
/**
* 是否为共享模式(共享锁)
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前驱节点或抛出空指针异常
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 用于建立初始的头或共享标记
Node() { // Used to establish initial head or SHARED marker
}
// 该构造方法会构造成一个 Node,添加到等待队列中
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 该构造方法会在 Condition 队列中使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node 变更过程
当出现锁竞争或释放锁时, AQS 同步等待队列中的节点会发生变化
添加节点
下面来看一下添加节点的场景是怎样的
在这里会发生三个变化:
- 新的竞争锁线程会封装成 Node 节点追加到同步队列中,设置 prev 节点指向原有的 tail 尾部节点
- 通过 CAS 操作将 tail 指针指向新加入的 Node 节点
- 修改原有的 tail 尾部节点 next 指针指向新加入的 Node
以上的变化发生在核心方法:AbstractQueuedSynchronizer#addWaiter 中
释放节点
head 节点表示获取锁成功的节点,当头节点在释放同步状态时,会唤醒后继节点,若后继节点获取锁成功,会将自身设置为头节点,节点的变化过程如下:
在这里会发生两个变化:
- 设置 head 头节点指向下一个获取锁的节点
- 新的获取锁节点,将 prev 指针指向 null
设置 head 头节点不需要使用 CAS 操作,原因:设置 head 头节点是由获取锁的线程来完成的,同步锁只能由一个线程获取,所以不适合通过 CAS 来保证,只需要把 head 头节点设置为原 head 头节点的后继节点,并且切断原 head 头节点的 next 引用即可
ReentrantLock 类源码分析
以 ReentrantLock 类作为入口,看看该类源码级别是如何使用 AQS 来实现线程同步的
时序图
ReentrantLock#lock 方法源码的调用过程,通过时序图的方式来进行展示
锁竞争核心方法
简单梳理了一下 lock 流程以后,下面来介绍 ReentrantLock、AQS 中一些核心方法内容以及其作用
NonfairSync#lock
NonfairSync 实现类是 ReentrantLock 类内部接口 Sync 的实现类,它采用非公平锁的方式进行锁竞争, 下面来看其源码内容是如何实现的
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
非公平锁、公平锁最大的区别:在于非公平锁抢占锁的逻辑是不管有没有等待队列中有没有线程在排队,我先上来用 CAS 操作抢占一下
- CAS 成功,即表示成功获取到了锁
- CAS 失败,调用 AbstractQueuedSynchronizer#acquire 方法走竞争锁逻辑
CAS(Compare And Set-比较并交换)
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通过 CAS 乐观锁的方式来作比较并替换,若当前内存中的 state 值与预期值 expect 相等,则替换为 update 值;更新成功返回 true,否则返回 false;该操作是原子性的,不会出现线程安全问题,这里面会涉及到 Unsafe 类的操作
state 是 AQS 中的一个属性,它在不同的组件实现中所表达的含义不一样,对于重入锁 ReentrantLock 来说,它有以下两个含义:
- 当 state = 0 时,表示无锁状态
- 当 state = 1 时,表示已经有线程获取到了锁
因为 ReentrantLock 允许可重入,所以同一个线程多次获取锁时,state 会递增,比如:在一段代码中,当前线程重复获取同一把锁三次(未释放的情况下)state 为 3;而在释放锁时,同样需要释放 3 次直至 state = 0 其他线程才有资格去获取这把锁
AQS#acquire
acquire 方法是 AQS 中的核心方法,若 CAS 操作未能成功,说明 state 值已经不为 0,此时继续调用 acquire(1) 方法操作,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法分以下几块逻辑进行:
- 通过 tryAcquire 尝试去获取独占锁,若成功返回 true,失败返回 false
- 若 tryAcquire 执行结果为 false,则会调用 addWaiter 方法,将当前线程封装成 Node 添加到 AQS 队列的尾部
- acquireQueued:将 Node 作为参数,通过自旋的方式去尝试获取锁,这里会执行线程的阻塞等待逻辑
ReentrantLock.NonfairSync#tryAcquire
NonfairSync#tryAcquire 方法重写至 AQS 类,AQS 该方法并没有实现,而是抛出异常,具体的实现内容交由给子类去进行实现,这里采用了设计模式 > 模版方法
具体的子类实现:ReentrantLock.NonfairSync#tryAcquire,该方法作用:尝试获取一把锁,若成功返回 true、失败返回 false
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
int c = getState();// 获取 state 状态
if (c == 0) { // 代表无锁状态
// CAS 替换 state 值,CAS 成功表示锁获取成功
if (compareAndSetState(0, acquires)) {
// 保存当前获取锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 若同一个线程多次获取同一把锁,直接增加锁重入次数即可
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 获取当前内存中 state 锁状态值
- state 状态为 0 代表当前锁处于无锁状态,首次获取锁的线程可以通过 CAS 操作更新 state 锁状态值
- 若当前线程等于锁占有的线程,则增加锁重入次数即可
- 其他情况,代表获取锁失败的线程,执行 AQS#acquire 方法中的 addWaiter(Node.EXCLUSIVE), arg) 方法 > 添加独占模式的 Node 队列节点
AQS#addWaiter
当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node,源码如下:
private Node addWaiter(Node mode) {
// 将当前线程封装为 Node 节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail 指向 AQS 同步队列中的尾部节点,默认:null
Node pred = tail;
// tail 不为空的情况下,说明同步队列中存在节点
if (pred != null) {
// 将当前线程 Node prev 前驱节点指针指向原来的尾部节点
node.prev = pred;
// 通过 CAS 操作将当前 Node 设置为尾部节点
if (compareAndSetTail(pred, node)) {
// 设置成功以后,将原来的尾部节点 next 后继节点指向当前 Node
pred.next = node;
return node;
}
}
// tail 为空的情况下,调用 enq 方法将当前 Node 添加到同步队列中
enq(node);
return node;
}
入参:mode 表示节点的状态,ReentrantLock 传入的状态参数:Node.EXCLUSIVE 代表独占模式,意味着重入锁获取锁采用独占的方式,addWaiter 方法基本的执行过程,如下所示:
- 将当前线程封装为 Node 节点对象
- 判断当前同步队列中的尾部节点是否为空
- 若尾部节点不为空,通过 CAS 操作将当前线程的 Node 添加到同步队列中,并将新加入的 Node 设置为尾节点,采用尾插法的方式进行队列入队的
- 若尾部节点为空或者 CAS 设置尾部节点失败,调用 enq 方法将当前 Node 添加到同步队列中
AQS#enq
该方法通过自旋的方式以便可以成功将当前节点加入到同步队列中
/**
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空,说明当前同步队列中未存在元素
// 初始化一个空对象 Node,先通过 CAS 将其设置为头节点,若成功再将其设置为尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 当前节点的前驱节点指向原来尾部节点、将当前节点设置为尾部节点、原来尾部节点后继节点指向当前节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq 方法执行只是为了维护同步等待队列的节点元素,当多个线程开始竞争锁时,必然会进行排队,第一次入队的线程不仅要承担将自身加入到队列中,同时还需要初始化一个空 Node 对象,将其设置为头尾节点
图解分析
假设有 3 个线程同时来争抢锁,那么截止到 AQS#addWaiter 或 AQS#enq 方法结束之后,AQS 中同步等待队列结构图,如下所示:
AQS#acquireQueued
当执行完 AQS#addWaiter 方法以后,会将返回的 Node 参数传递给 acquireQueued 方法,去实现锁竞争、阻塞线程逻辑,方法源码如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前 Node 节点的前驱 prev 节点
final Node p = node.predecessor();
// 若前驱 prev 节点为头节点,当前 Node 重新尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,说明锁已经被持有的线程所释放,设置当前 Node 为头节点
setHead(node);
// 将原 head 头节点从同步队列中移除
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取锁失败,会获取前驱节点并更新 waitStatus 状态值
// 随机调用原生锁 LockSupport#park 方法阻塞当前竞争锁的线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued 方法基本的执行过程同时以 AQS#enq 分析中的图解分析为例,如下所示:
- 获取当前节点 Node prev 前驱节点
比如:当前是线程 B,那么它的前驱节点就是 Thread 为 null 的头节点
- 若 prev 前驱节点为 head 头节点,那么它有资格再去争抢一次锁,调用 ReentrantLock.NonfairSync#tryAcquire 方法抢占锁
也就是说线程 B 在这里也会有一次机会再去争夺锁
- 若抢占锁成功,把当前抢到锁的 Node 节点设置为 head 头节点,并且移除原有的头节点
- 若抢占锁失败,先通过 shouldParkAfterFailedAcquire 方法更新一次 waitStatus 值状态,然后再调用原生锁支持 > LockSupport.park(this) 阻塞当前线程等待后续被唤醒
仍然以线程 A 未释放锁,线程 B 处于首节点的情况作以说明:由于 acquireQueued 方法是死循环,所有的 Node 新建时 waitStatus 属性值都为 0 (除了 Condition 条件变量)第一次遍历时会抢一次锁;这一次会调用 shouldParkAfterFailedAcquire 方法将 waitStatus -> 0 更改为 SIGNAL-待唤醒状态,该方法执行完以后会返回 false,然后会继续第二次循环,第二次执行 shouldParkAfterFailedAcquire 方法返回 true,接着会调用 parkAndCheckInterrupt 方法使用原生锁方式:LockSupport.park(this) > 阻塞当前线程并返回当前线程是否中断的标识
- 最后,通过 cancelAcquire 方法取消当前线程获取锁的节点
AQS#shouldParkAfterFailedAcquire
线程 A 未释放锁,线程 B、线程 C 来争抢锁肯定会失败,失败以后会调用 shouldParkAfterFailedAcquire 方法,Node#waitStatus 存在五种状态,如下:
- CANCELLED:值为 1,即为结束状态,在同步等待队列中等待的线程超时或被中断,需要从同步队列中取消该线程 Node 节点,进入该状态以后的节点将不再发生变化
- 0:初始化状态
- SIGNAL:值为 -1,当前驱节点释放锁以后,就会通知标识为 SIGNAL 状态的后继 Node 节点线程
- CONDITION:值为 -2,与 ReentrantLock#newCondition 条件变量有关系,AQS.ConditionObject#addConditionWaiter 在该方法中会提现出来
- PROPAGATE:值为 -3,在共享模式下,PROPAGATE 处于可运行状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 若前驱节点为 SIGNAL,意味着只需要等待其他前驱节点的线程被释放
// 当获取锁的线程调用 release 方法后,该前驱节点的线程就会被唤醒
if (ws == Node.SIGNAL)
// 返回 true,意味着当前线程可以放心调用 parkAndCheckInterrupt 方法进行挂起
return true;
// waitState 大于 0,意味着 prev 前驱节点取消了排队操作,直接将这个节点移除即可
if (ws > 0) {
// 相当于:pred=pred.prev;node.prev=pred;
// 从尾部节点开始查找,直到将所有的 CANCELLED 节点移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 使用 CAS 设置前驱 prev 节点状态为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法主要作用:通过 Node 节点状态来判断,线程 B、线程 C 竞争锁失败后是否应该要被挂起
- 若 Thread-B、Thread-C 前驱 prev 节点状态为 SIGNAL,表示可以放心挂起所在的当前线程
- 若当前线程 prev 节点状态为 CANCELLED,采用循环方式扫描同步等待队列将 CANCELLED 状态的节点从同步等待队列中移除
- 以上两个条件都满足,将前驱 prev 节点状态改为 SIGNAL,返回 false
该方法返回 true、false 代表的含义不同,当返回 true 时,不会进入 AQS#acquireQueued 方法的下一次循环,会调用 parkAndCheckInterrupt 方法将当前线程阻塞;当返回 false 时,会进入到 AQS#acquireQueued 方法的下一次循环再次尝试争抢一次锁,当抢锁成功当前线程就是独占线程,抢锁失败再调用 parkAndCheckInterrupt 方法将当前线程阻塞
AQS#parkAndCheckInterrupt
parkAndCheckInterrupt 方法逻辑比较简单,先看源码,如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- 调用 LockSupport#park 方法挂起当前线程编程 waiting 状态
LockSupport 原生锁支持
park 方法:阻塞当前线程,不需要使用 sync 修饰,直接可以使用
unpark 方法:唤醒指定线程
unpark 方法可以先于 park 方法先调用,unpark 相当于是获取许可数量 1、park 相当于是消费许可数量 1
- Thread#interrupted:返回当前线程是否被其他线程触发过中断请求,也就是调用 Thread#interrupt 方法;若有触发过中断请求,那么该方法会返回当前的中断标识为 true,并且会对中断标识进行复位标识已经响应过了中断请求,也就是会在 AQS#acquire 方法中执行 selfInterrupt 方法
- selfInterrupt:标识当前线程是否执行 AQS#acquireQueued 方法时被中断过,若被中断过,则需要响应中断请求,因为在线程调用 AQS#acquireQueued 方法是不会去响应中断请求的
通过 AQS#acquireQueued 方法来竞争锁,若 Thread-A 仍然还在执行中未释放锁,那么 Thread-B、Thread-C 还会继续挂起
到这里,锁相关的竞争方法在这里基本上都介绍过了,其实看到这里,能发现,当竞争的锁线程失败时,会调用 LockSupport#park 方法阻塞住,等待锁匙放时,还会有 LockSuppor#unpark 方法进行锁匙放,下面就来分析锁匙放时的一些核心方法是如何处理的!!!
锁释放核心方法
若此时 Thread-A 释放锁了,那么接下来 Thread-B、Thread-C 是如何走的呢?
ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
在 unlock 方法中,会调用其内部类 Sync#release 方法,但由于 Sync 并未其父类 AQS#release 方法,所以它会延用其父类 AQS#release 方法的处理逻辑,源码如下:
public final boolean release(int arg) {
// 若释放占用当前锁的节点 Node 线程成功
if (tryRelease(arg)) {
// 获取 AQS 同步等待队列中的 head 头节点
Node h = head;
// 若 head 节点不为空 & waitStatus 非默认值,直接唤醒下一个节点去争抢锁
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法主要的执行流程分为几步,如下:
- 先调用 ReentrantLock.Sync#tryRelease 方法探测锁释放是否可以成功,它来自 AQS 子类 ReentrantLock.Sync 所实现的
- 获取同步等待队列中的 head 首节点,若其不为空,并且它的 waitStatus 属性值非默认值 0,那么就会调用 unparkSuccessor 方法唤醒队列中的下一个节点
ReentrantLock.Sync#tryRelease
该方法也体现了锁重入次数的操作,源代码如下:
protected final boolean tryRelease(int releases) {
// 当前锁线程重入次数减去要释放的次数
int c = getState() - releases;
// 当前线程不等于锁持有线程,则判断中断监听锁异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 若减去后的锁次数为 0
if (c == 0) {
// 返回 true、设置锁持有线程为 null,其他线程就可以竞争锁了
free = true;
setExclusiveOwnerThread(null);
}
// 递减锁重入次数,返回 false,锁仍然被当前线程所持有
setState(c);
return free;
}
ReentrantLock.Sync#tryRelease 执行流程主要分析如下:
- 通过将 AQS#state 属性值减少传入的参数值(参数:1)若减去的结果状态值为 0,就将排它锁 Owner 持有线程设置为 null,同时返回 true,以便于其他的线程有机会执行竞争锁操作
- 若减去的结果状态值不为 0,返回 free 变量默认值 false,当前线程仍然继续持有这把锁,其他线程暂时不可以争抢锁
在排它锁中,加锁时 state 状态会增加 1,在解锁时会减去 1,同一把锁,在被重入时,可能会被叠加为 2、3、4 等,只有当调用 unlock 方法次数与调用 lock 方法次数相对应,才会把锁 Owner 持有线程设置为空,也只有这种情况下该方法执行结果才有返回 true
AQS#unparkSuccessor
当 ReentrantLock.Sync#tryRelease 方法执行完以后,会取同步等待队列中首节点,唤醒队列中下一个节点去争抢这把锁,该方法源码如下:
private void unparkSuccessor(Node node) {
// 获取传入节点的 waitStatus 属性值
int ws = node.waitStatus;
if (ws < 0)
// 小于 0 通过 CAS 将其修改为 0
compareAndSetWaitStatus(node, ws, 0);
// 获取传入节点的后继节点
Node s = node.next;
// 若后继节点为空或者 waitStatus 大于 0 说明它是 CANCELLED-结束状态
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部节点开始扫描,找到距离当前传入节点最近的一个 waitStatus 小于等于 0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 将同步等待队列中 > 最前面的一个非 CANCELLED 状态的 Node 线程进行唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
分析一下此方法分别会作那些事情,如下:
- 获取当前传入节点 Node waitStatus 属性值,若它小于 0 时,先通过 CAS 操作将其修改为 0
当前节点其实就是 head 头节点,唤醒的操作不会唤醒头节点,只会唤醒头节点后面不为 CANCELLED 状态的首节点 Node 线程
- 获取当前节点的 next 后继节点,若后继节点为空或 waitStatus 大于 0(CANCELLED)那么就会遍历该同步等待队列,从尾部往前查找的方式,匹配到与当前节点最近的一个非 CANCELLED 节点,将其设置为待唤醒的节点
- 若待唤醒的节点不为空,调用原生锁 LockSupport#unpark 方法将其唤醒,以便它可以再次去争抢锁
当节点被唤醒后,比如:Thread-A 释放锁成功以后,会调用 AQS#unparkSuccessor 方法唤醒它的下一个节点 Thread-B 所持有的(非 CANCELLED)
随机 Thread-B 被唤醒,它会继续执行 AQS#acquireQueued 方法中的循环,执行:if (p == head && tryAcquire(arg)) 代码块,所以后续被唤醒的线程都会是这样,通过该代码来确保同步队列中的节点能够获取锁资源
那么为什么在释放锁的时候一定要从尾部开始扫描呢?
回顾一下 AQS#enq 方法执行的逻辑,插入新节点时,它是从队列尾部进行节点入队的,
看下图红色所标注的
,在 CAS 操作成功之后,t.next = node; 操作之前,可能会存在其他线程调用 unlock 方法从 head 开始向后遍历,由于 t.next = node; 还未执行也就意味着同步等待队列关系还未建立完整,就会导致遍历到原始的尾部节点时被中断 > 队列中的链表关系断链了;所以说,从后往前遍历就不会出现这个问题
挂起线程被唤醒后执行过程
当持有锁的线程调用 ReentrantLock#unlock 方法,原本被挂起的 Thread-B、Thread-C 线程就有机会被唤醒再继续执行,被唤醒之后的线程会继续执行 AQS#acquireQueued 方法内的循环,该方法在上面已经分析过了,接下来以 Thread-B 被唤醒后为例,看它整个的执行过程以及变化,以流程图的方式呈现
同步等待队列变更结构图:
同步等待队列执行过程流程图:
博主是以如下源码,对 ReentrantLock、AQS 核心方法源码进行查看的,分享如下:
public class MultiThreadReentrantLockDemo {
private static final ReentrantLock LOCK = new ReentrantLock();
public void threadAProcess() {
LOCK.lock();
try {
System.out.println("执行:threadAProcess 方法");
// 处理业务逻辑中....
// 断点过程中该时间可以延长
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
public void threadBProcess() {
LOCK.lock();
try {
System.out.println("执行:threadBProcess 方法");
// 处理业务逻辑中....
TimeUnit.SECONDS.sleep(60 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
public void threadCProcess() {
LOCK.lock();
try {
System.out.println("执行:threadCProcess 方法");
// 处理业务逻辑中....
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
MultiThreadReentrantLockDemo multiThreadLock = new MultiThreadReentrantLockDemo();
new Thread(()-> multiThreadLock.threadAProcess(), "Thread-A").start();
Thread threadB = new Thread(() -> multiThreadLock.threadBProcess(), "Thread-B");
threadB.start();
// 可能会出现 Thread-C 先执行的情况,所以先通过 join 方法让线程 B 先跑完
threadB.join();
new Thread(()-> multiThreadLock.threadCProcess(), "Thread-C").start();
}
}
注意:断点模式下,要以 Thread 模式执行;如下图:
公平锁、非公平锁区别
锁的公平与否其实取决于获取锁的顺序性,若为公平锁,那么获取锁的顺序应该绝对符合 FIFO 队列 > 先进先出的特性,上面所分析的例子都是以非公平锁(默认是非公平锁)只要 CAS 设置 AQS#state 属性值成功,就代表当前线程获取到了锁,而公平锁不一样,差异的地方有如下两点:
1、FairSync#lock、NonfairSync#lock
非公平锁在获取锁时,先通过 CAS 操作进行锁抢占,而公平锁不会
2、FairSync#tryAcquire、NonfairSync#tryAcquire
两者方法之间不同之处在于判断多了一个条件:hasQueuedPredecessors,也就是说加入了同步队列中当前节点是否有前驱节点的判断,若该方法返回 true,则表示有线程比当前线程更早入队、更早地请求获取锁,因此,需要等待前驱节点的线程获取完再释放锁以后才能继续获取锁!
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
1、h != t:头尾节点是否相同,若相同则表示队列中只有一个节点,即当前未发生锁竞争
假设:当前只有线程 Thread-A 一人争抢锁,那么 head == null && tail ==null,那么返回 false 会去争抢锁;反之,会继续走第二步的判断
2、(s = h.next) == null:检查头节点的后继节点是否为空,即判断是否存在后继节点
假设:线程 Thread-A、Thread-B 同时争抢锁,Thread-A 抢到了,那么同步等待队列中会有头节点、Thread-B 所在节点,条件不满足返回 false,会继续走第三步的判断;反之,当前只有一个节点 > 返回 true 不会去争抢锁,不走第三步的判断
3、s.thread != Thread.currentThread():检查后继节点的线程是否与当前线程不同,即判断后继节点持有线程是否为当前线程
假设:头节点的后继节点持有线程就是当前的线程,会返回 false 会去争抢锁;反之,头节点的后继节点持有线程不是当前的线程,会返回 true 不会去争抢锁,它会进入到排队模式!!!
总结
ReentrantLock 基于悲观锁实现(LockSupport),但是在处理 AQS#state 锁状态时是基于 CAS 乐观锁实现的,两者在不同场景下都会各自的好处,因为前者已经悲观锁,后者再用 CAS 操作并没有任何问题
>在这里其实就是偷换概念了,不一定用了悲观锁就不能用乐观锁
该篇博文介绍了 JUC 组件下 ReentrantLock 核心概念、使用、源码以及 AQS 基础组件的核心方法,阐述了 AQS 内部实现、数据结构以及节点变更过程,在后面,看 ReentrantLock 是如何基于 AQS 核心方法去完成其内部锁竞争工作的、锁释放后如何唤醒其他节点线程,全文上下以画图+文字加以说明,不限于时序图、结构图、流程图,最后说明了在 ReentrantLock 公平锁、非公平锁之间的区别,希望能够帮助你快速理解 AQS 内部如何巧妙处理高并发场景问题的
如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!
推荐专栏:Spring、MySQL,订阅一波不再迷路
大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: