写在前面:

本人在学习 Java 并发编程 学习到一种除了 synchronized 用得较多的一种锁 ReentrantLock,本文旨在学习研究 ReentrantLock 相关知识,如 AbstractQueuedSynchronizer(简称 AQS),Lock 接口等。但碍于学识浅薄,文章中难免会出现错误,恳请各位读者加以指正。


本文主要通过以下几个方面来系统的介绍 ReentrantLock

  1. AQS 同步器
  2. 基于 AQS 自定义一个阻塞式的锁
  3. ReentrantLock
  4. ReentrantReadWriteLock 读写锁

AQS

ReentrantLock 继承自 AbstractQueuedSynchronizer(简称 AQS),所以我们从 AQS 开始讲起。

AQS (抽象的、基于队列的同步器)是**阻塞式锁相关同步器工具**的框架,即其他的同步器工具都是它的子类,且基于 AQS 实现的锁是阻塞式的锁。

AQS 的特点有:

  1. state 属性(volatile int)来表示资源的状态,分为独占模式 (只有一个线程访问一个资源)和共享模式(可以有多个线程访问多个资源)。需要由 AQS 的子类来定义如何维护这个状态,控制如何获取锁与释放锁。
    • getState :获得 state 状态
    • setState :设置 state 状态
    • compareAndState :使用 CAS 机制设置 state 状态(保证 state 变量的原子性)
  2. 提供了基于 FIFO 的等待队列(JDK 层面),类似于 ObjectMonitor 中的 _EntryList(c++ 底层);
  3. 条件变量 Condition 来实现 wait \ notify 机制,类似于 ObjectMonitor 中的 _WaitSet,但是支持多个条件变量

AQS 的子类主要实现(@override)如下方法:

  • boolean tryAcquire():获取锁,只尝试一次。如果获取锁成功,则返回 true;反之 false,并将当前线程阻塞(底层使用 park() 阻塞当前线程),加入到等待队列中。
1
2
3
4
// 如果获取锁失败
if (!tryAcquire()) {
// 规则由子类规定:阻塞,并加入等待队列
}
  • tryRelease():释放锁,只尝试一次。如果释放锁成功,则返回 true;反之 false
1
2
3
4
// 如果释放锁失败
if (!tryRelease()) {
// 规则由子类规定:恢复 unpark()
}
  • tryAcquireShared():获取锁
  • tryReleaseShared():释放锁
  • isHeldExclusively()

自定义一个阻塞式的锁

【步骤】

  1. 首先我们要让我们自定义的 MyLock 这个类实现 java.util.concurrent.lock.Lock 接口;
  2. 然后在 MyLock 中实现一个 AQS 同步器类。

【具体实现】

  • 我们先写一个 MySync 类继承自 ADS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 我们希望实现这个同步器类MySync。
// 如果我们希望自定义一个不可重入锁,那么我们需要重写以下几个方法:
// 1 tryAcquire()
// 2 tryRelease()
// 3 isHeldExclusively()
// 4 newCondition()

class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)) { // 使用CAS保证state状态的修改(0 -> 1)是原子性的
// 如果成功了,就说明当前线程已经拥有了锁
// 将锁的Owner该为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else {
return false;
}
}

@Override
protected boolean tryRelease(int arg) {
// 先判断当前线程是否持有这个锁
if (getExclusiveOwnerThread() == Thread.currentThread()) {
// 如果持有,将锁的Owner该为null
// 再修改state
// 思考:为什么这里要先"释放"锁再改变state呢?
// 因为JIT的指令重排序可能会打乱指令执行的顺序,而state是由volatile修饰的,
// 在此处有一个读写屏障,从而保证前面的修改的可见性。
setExclusiveOwnerThread(null);
setState(0);
return true;
} else {
return false;
}
}

@Override
protected boolean isHeldExclusively() { // 是否持有该不可重入锁
// 持有独占锁的时候就以为着,state = 1
return getState() == 1;
}

protected Condition newCondition() {
return new ConditionObject();
}
}
  • 我们再实现 Lock 接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MyLock implements Lock {
// 我们可以看到,我们需要重写很多Lock类的方法。
// 但是我们有一个继承AQS的同步器类MySync,他可以实现大部分方法。
private MySync sync = new MySync();

@Override
public void lock() { // 加锁,不成功就进入等待队列等待
sync.acquire(1); // 先tryAcquire(),若不成功则加入等待队列

}

@Override
public void lockInterruptibly() throws InterruptedException { // 加可打断锁
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() { // 尝试加锁,一次
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 带有超时时间的尝试加锁
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() { // 解锁
sync.release(1); // release() = tryRelease() + 唤醒阻塞队列中的线程
}

@Override
public Condition newCondition() { // 创建一个条件变量
return sync.newCondition();
}
}
  • 我们写一个测试方法来测试这个 MyLock 是否符合预期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class test {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread (() -> {
lock.lock();
try {
System.out.println("Time:" +System.currentTimeMillis()+ "\t t1 加锁");
System.out.println("Time:" +System.currentTimeMillis()+ "\t t1 等待3秒。。。");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
{
System.out.println("Time:" +System.currentTimeMillis()+ "\t t1 解锁");
lock.unlock();
}
}
},"t1").start();

new Thread (() -> {
lock.lock();
System.out.println("Time:" +System.currentTimeMillis()+ "\t t2 加锁");
System.out.println("Time:" +System.currentTimeMillis()+ "\t t2 再次加锁。应该会被阻塞。程序不会结束。。。。");
lock.lock();
try {
System.out.println("Time:" +System.currentTimeMillis()+ "\t t2 加锁");
} finally {
{
System.out.println("Time:" +System.currentTimeMillis()+ "\t t2 解锁");
lock.unlock();
}
}
},"t2").start();
}
}

输出:

1
2
3
4
5
6
Time:1662389256121	 t1 加锁
Time:1662389256121 t1 等待3秒。。。
Time:1662389259126 t1 解锁
Time:1662389259126 t2 加锁
Time:1662389259126 t2 再次加锁。应该会被阻塞。程序不会结束。。。。

【结果分析】

我们可以看到,MyLock 类确实能实现锁的功能。而且在 t2 线程第二次调用 lock.lock() 时,因为不是可重入错,线程被阻塞了。

ReentrantLock

经过以上过程,我们应该已经对“如何实现一个锁”有了大致的思路。现在我们来分析与上述例子相似的 ReentrantLock

给出 ReentrantLockUML 类图:

image-20220905170944981

可以看出,对于 ReentrantLockSync 同步器有两种类型:FairSyncNonfairSync。所以 ReentrantLock 中也有公平锁与非公平锁(默认)的实现。

ReentrantLock 的非公平锁实现

ReentrantLock 构造器

ReentrantLock 的构造器开始:

1
2
3
4
5
6
7
 public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可以看出,ReentrantLock 默认是非公平锁(无参构造时)。NonfairSync 是间接继承自 AQS

ReentrantLock 的加锁流程

1
2
3
4
5
6
7
final void lock() {
if (compareAndSetState(0, 1)) // 用CAS试图将state状态由0改为1
// 成功的话,将Owner设为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 失败,执行 AQS.acquire()。见下方
}
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && // !(能否用CAS试图将state状态由0改为1)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 尝试创建一个Node,将其加入等待对列
selfInterrupt(); // 执行打断(若打断标记为真)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 设置打断标记为 false
for (;;) {
final Node p = node.predecessor(); // p 为当前节点的前驱节点
if (p == head && tryAcquire(arg)) { // 检查p是不是Head && CAS尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 检查尝试获取锁失败后是否阻塞,
// 并将后继节点不是Tail的节点的waitState设为-1,意为有义务唤醒后继节点
parkAndCheckInterrupt()) // 阻塞node,并检查node是否被打断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 我们可以看到,ReentrantLock 底层使用的是 park
return Thread.interrupted();
}
没有竞争时

当只有一个线程 thread 0 想获得这个锁时,

  1. CAS 试图将 state 状态由 0 改为 1,成功;
  2. setExclusiveOwnerThread(Thread.currentThread()):将 Owner 改为当前线程。
image-20220905173324044
第一个竞争出现时

在另一个线程 thread 1 来竞争这个锁时,

  1. CAS 试图将 state 状态由 0 改为 1,但是失败了,因为 state 已经是 1 了;进入 acquire()

  2. !tryAcquire():再次用 CAS 试图将 state 状态由 0 改为 1,但是失败了,返回 true

  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg):尝试将 thread 1 成装成 Node,存入队列

    • 图中黄色三角形表示该 NodewaitStatus 状态,其中 0 默认正常状态,-1 则说明当前节点有义务唤醒后继节点

    • Node 的创建是懒惰的

    • 启动第一个 Nodenull,我们称之为 Dummy(哑元),用来占位,并不关联线程。一般被 Head 引用。

      image-20220906112622059
  4. 当线程进入 acquireQueued() 方法内:

    • acquireQueued() 会一直在一个 for(;;) 的死循环内不断尝试获得锁,失败之后会 park() 阻塞当前线程;

    • 如果 node(当前线程)的前驱节点是 Head,则再次尝试 CAS 获得锁;但是此时 thread 1 失败;

    • 进入 shouldParkAfterFailedAcquire() 方法:如果 node 的前驱节点的 waitStatus 状态是 -1(表示有责任唤醒后继节点),则结束该方法,返回 true;如果 waitStatus 状态是 >0,则将该节点的前驱节点 waitStatus 状态改为 -1,返回 false

    • 进入 parkAndCheckInterrupt():将 node(当前线程)阻塞 park()。图中用灰色表示。

      image-20220906120134725
  5. 经过多个线程经历上述过程竞争失败后,变成如图所示的样子:

    image-20220906120934772
锁重入时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class NonfairSync extends Sync {
// ...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果没有锁重入
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 更新锁重入计数
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
  1. 检查 state 状态,如果 state > 0 时,说明锁已经被别的线程拥有;
  2. 检查 Owner 是否是当前线程,如果是则说明这个锁是当前线程的可重入锁 ,state++

ReentrantLock 的解锁流程

1
2
3
public void unlock() {
sync.release(1);
}
1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
// 存在 Head && Head的waitStatus不为0
// 说明队列中还有正在等待的线程Node
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后继Head的后继节点
return true;
}
return false; // 尝试释放锁失败,返回false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
// 更新重入计数c
int c = getState() - releases;
// 判断Owner是否是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException(); // 不是则抛出异常
boolean free = false;
// 重入计数=0时
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
没有锁重入时
  1. thread 0 释放锁,进入 tryRelease() 方法:

    • Owner 置为 null

    • state 置为 0

      image-20220906124417481
  2. 现在就可尝试 CASstate 由 0 改为 1,所以进入 if (tryRelease(arg)) {} 代码块:

    • 此时存在 Head 节点(即 head 的引用不为空)且 HeadwaitStatus != 0,所以唤醒 Head 的后继节点 ,unpark(thread 1)

    • 此时 thread 1 被唤醒,恢复执行。因为 thread 1 在被 park() 前在 acquireQueued()for(;;) 死循环内不断尝试获得锁,所以唤醒之后继续在死循环内尝试获得锁;

    • 此时 thread 1 节点的前驱节点是 Head ,且可以成功地尝试获得锁,那么我们可以将 thread 1 节点从等待队列中移除

      image-20220906124634776

发生不公平现象

【注意 ⚠️】

我们考虑这样一个问题:如果在 thread 0 线程刚释放完锁,state = 0 的时候,有一个线程 thread_x 想要获取锁。那么 thread_x 就与等待队列中本来应该被唤醒的 thread 1 产生了竞争。如下图。

image-20220906125528941

如果 thread_x 得到了锁,那么 thread 1 就会被再次 park 。对于等待队列中的线程节点来说就是【不公平的】,因为 thread_x 插队了。

image-20220906130127123
锁重入时
image-20220906130936857
  1. 检查 state 状态,如果 state > 0 时,说明锁已经被别的线程拥有;
  2. 检查 Owner 是否是当前线程,如果是则说明这个锁是当前线程的可重入锁,解锁时 state--
  3. state = 0 时才释放这个锁。

ReentrantLock 可打断原理

不可打断模式(默认)

我们之前说过,线程在没办法立即获得锁时,就会在 acquireQueued() 方法中的死循环中不断尝试获取锁。尝试不成功就会被 park 阻塞。但是我们知道,一个线程被 park() 之后会interrupte() 方法打断阻塞状态,但是打断标记会被清除,以至于可以继续在死循环里运行,直到再次被 park

从下面源码中可以看出,我们引入了一个 interrupted 作为记录是否被打断过的标记。在不可打断模式下,只有在该被打断过的线程得到锁后,才会响应打断 selfInterrupt()

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && // !(能否用CAS试图将state状态由0改为1)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 尝试创建一个Node,将其加入等待对列
selfInterrupt(); // 执行打断(若打断标记为真)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 设置打断标记为 false
for (;;) {
final Node p = node.predecessor(); // p 为当前节点的前驱节点
if (p == head && tryAcquire(arg)) { // 检查p是不是Head && CAS尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 检查尝试获取锁失败后是否阻塞,
// 并将后继节点不是Tail的节点的waitState设为-1,意为有义务唤醒后继节点
parkAndCheckInterrupt()) // 阻塞node,并检查node是否被打断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 我们可以看到,ReentrantLock 底层使用的是 park
return Thread.interrupted();
}

可打断模式

我们可以从如下源码中看出,可打断模式中 doAcquireInterruptibly() 整体与不可打断模式相同,但是在处理打断的过程中:可打断模式会直接抛出一个异常 InterruptedException(),进而打断线程。

1
2
3
4
5
6
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

ReentrantLock 公平锁原理

不公平锁

我们在前面讨论过不公平发生的情况。我们知道,ReentrantLock 默认是不公平锁:

如果在 thread 0 线程刚释放完锁,state = 0 的时候,有一个线程 thread_x 想要获取锁。那么 thread_x 就与等待队列中本来应该被唤醒的 thread 1 产生了竞争。如果 thread_x 得到了锁,那么 thread 1 就会被再次 park 。对于等待队列中的线程节点来说就是【不公平的】,因为 thread_x 插队了。

公平锁

那么 ReentrantLock 如何实现平平锁呢?

通过下面源码我们可以看出:公平锁模式下,线程执行 tryAcquire() 尝试获取锁时,需要判断 hasQueuedPredecessors()检查等待队列是否为空、检查 Head 节点有无后继节点,且该后继节点关联的线程是不是当前线程。若满足则说明 AQS 等待队列里有优先级更高的线程,当前线程不会争抢锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static final class FairSync extends Sync { 
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 检查 AQS 等待队列的情况,检查自己是否有资格获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && // AQS 等待队列是否非空
// 检查Head节点有无后继节点,且该后继节点关联的线程是不是当前线程
((s = h.next) == null || s.thread != Thread.currentThread());
}

ReentrantLock 条件变量实现原理

每个条件变量对应着一个等待队列,它的实现类是 ConditionObject。它维护一个单链表,作为存放那些不满足条件而被阻塞的线程。

对于 ConditionObject,其主要有两个方法:await()singal()

注意 ⚠️

这里我们之所以说 ConditionObject 维护一个单链表,是因为其中的 Node 只有后继节点,不能找到前驱节点。但是它是可以变成双链表的,因为 Node 中有可以找到前驱节点的指针。

await() 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 把线程计入到Condition的等待队列尾部
int savedState = fullyRelease(node); // 把node对应线程上的锁【全部释放】,state=0
// 唤醒同步器等待队列中 Head的后驱节点
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 如果node不在同步器的等待队列中,
// 就一直park这个node关联的线程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break; // 打断模式是在
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

开始时 thread 0 持有锁,

  1. 此时调用 await() 方法,会将当前线程封装成 Node 加入到 ConditionObject 的等待队列尾部,设置其 waitStatus = -2 (Node.CONDITION)
  2. 进入 AQSfullyRelease() 流程,把 node 对应线程上的锁全部释放,并将其 park 阻塞等待;
  3. unpark AQS 队列中 Head 的后继节点,竞争锁。
image-20220906155335560

signal() 流程

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) // 判断 Owner != 当前线程?
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) // 判断 ConditionObject的等待队列非空
doSignal(first); // single 队首节点
}
1
2
3
4
5
6
7
8
9
10
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; // 断开首个节点
} while (!transferForSignal(first) && // 将刚断开的节点转移到同步器的等待队列末尾,
// 并唤醒
(first = firstWaiter) != null); // 如果转移失败(被打断等),
// 尝试single下一个节点
}

由上述源码我们可以看出,

  1. 当当前线程调用 single() 方法时,会将 ConditionObject 的等待队列中队首节点 first doSignal()

    image-20220906163445980
  2. 断开 first 节点,并将其转移至同步器的等待队列末尾,并唤醒等待队列中 Head 的后继节点,更新waitStatus

    image-20220906164006923

ReentrantLock 原理流程图

reentrantLock

ReentrantReadWriteLock 读写锁

ReentrantReadWriteLock,顾名思义,就是可重入的读写锁。我们先来介绍什么是读写锁。

我们知道,在并发地访问同一个临界区资源(如并发地读写同一个数据时),

  • 读 - 读 操作并不影响数据的安全性,
  • 读 - 写 操作写 - 写 操作 需要互斥地进行。

相应的,我们就需要两种锁来保证读写数据的安全性:读锁写锁。而在 ReentrantLock 中提供了一种读写锁 ReentrantReadWriteLock 的具体实现。其使用的示例如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DataOperator {
private Object date;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private ReentrantReadLock readLock = readWriteLock.readLock();
private ReentrantReadLock writeLock = readWriteLock.writeLock();

public Object read() {
readLock.lock();
try {
// TODO: 读取操作
}
finally {
readLock。unlock();
}
}

public void write() {
writeLock.lock();
try {
// TODO: 写操作
}
finally {
writeLock.unlock
}
}
}

ReentrantReadWriteLock 原理

ReentrantReadWriteLock 用的是同一个 Sync 同步器(以下以非公平同步器 NonFairSync 为例),所以他们的等待队列、state 状态等都是同一个。不同点是:写锁对应着 state 状态的16 bits读锁对应着 state 状态的16 bits。即

1
2
3
4
5
|-----------------------------------------------------|
| state |
|-----------------------------------------------------|
| 读锁 state (16 bits) | 写锁 state (16 bits) |
|-----------------------------------------------------|

上锁流程

下面我们从源码角度试图分析 ReentrantReadWriteLock 中上锁的原理:

1
2
3
4
5
6
7
8
9
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

}

写锁】部分源代码

1
2
3
4
5
6
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.acquire(1);
}
}
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) && // !(能否用CAS试图将state状态由0改为1)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 尝试创建一个Node,将其加入等待对列
selfInterrupt(); // 执行打断(若打断标记为真)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState(); // 获取state状态
int w = exclusiveCount(c); // 返回写锁状态(低16位)
if (c != 0) { // state状态不为0,
// 但是在读写锁中有【高16位】与【低16位】之分
if (w == 0 || current != getExclusiveOwnerThread())
// w==0,说明没有写锁,从而说明存在【读锁】
// 但是此时我们想加【写锁】,读写互斥,不满足判断条件
// 然后再判断Owner是否为当前线程
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 写锁的重入超过了最大范围
// 16位,65535
throw new Error("Maximum lock count exceeded");
setState(c + acquires); // 更新state
return true;
}
// state为0
if (writerShouldBlock() || // 判断写锁是否该阻塞
!compareAndSetState(c, c + acquires)) // 尝试CAS加写锁,低16位 0->1
return false;
setExclusiveOwnerThread(current);
return true;
}

读锁部分】源代码:

1
2
3
4
5
6
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.acquireShared(1);
}
}
1
2
3
4
5
6
7
8
9
public final void acquireShared(int arg) {
// tryAcquireShared() 方法返回一个整数int
// 返回 -1:表示获取读锁失败
// 返回 0:夺取读锁成功,且没有后继节点可以被唤醒了;
// 返回 n:夺取读锁成功,且还有n个后继节点可以被唤醒;
// 在读写锁中只有【-1】和【1】两种情况
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ReentrantReadWriteLock{
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();

if (exclusiveCount(c) != 0 && // 检查写锁部分state是否为0
getExclusiveOwnerThread() != current) // Owner是否是当前线程
// 当前线程先加写锁,再加读锁
return -1; // 尝试加读锁失败,返回 -1

// 刚释放写锁后,unpark第一个读线程时:
int r = sharedCount(c); // r = state高十六位
if (!readerShouldBlock() && // 检查读线程是否应该被阻塞
r < MAX_COUNT && // r是否超过计数范围
compareAndSetState(c, c + SHARED_UNIT)) { // state高十六位:加1(65536)
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1; // 读锁加锁成功!
}
return fullTryAcquireShared(current);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 把当前线程封装成Node.SHARED
// 并加入到等待队列中
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
int r = tryAcquireShared(arg); // 再次尝试获取读锁
if (r >= 0) { // 尝试获取成功
setHeadAndPropagate(node, r); // 将node节点移出等待队列。
// 如果【node后继节点: s】是【SHARED】,
// 则继续唤醒s关联的线程,若s也是读线程
// 则也会唤醒s后继节点的线程......
// 也就是说:
// 会一下子释放【连续的读线程】
// |R,R,R,R|,W,R,....
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && // 检查是否应该被park,
// 并设置waitStatus状态,
// 使得前面的节点可以unpark后面的
parkAndCheckInterrupt()) // park当前线程,并一直检查是否有打断
// unpark的线程从这里恢复运行
interrupted = true;
} finally {
if (failed)
cancelAcquire(node);
}
}

由以上的源码我们可知,

  • 请求读锁的线程封装成 Node 节点时,其状态是 Node.SHARED (共享状态)
  • 请求写锁的线程封装成 Node 节点时,其状态是 Node.EXCLUSIVE(独占状态)

下面我们以两个线程 thread_1thread_2 举例:

(一)thread_1 写锁 , thread_2 读锁
  1. thread_1 线程想获得【写锁】,流程与 ReentrantLock 大致无异:
  • 如果 state == 0,说明此时既没有【读锁】也没有【写锁】,则可以加锁(在“公平规则”下,还需要判断等待队列里是否有其他线程);
  • 如果 state高 16 bits 不为 0,即 state = n_0,则说明已经有线程获得了【读锁】。再检查 Owner 是否是当前线程,如果是,则可以加写锁,如果不是,则说明其他线程获得了读锁读-写 互斥,所以无法加锁;
  • 如果 state低 16 bits 不为 0,即 state = 0_n,则说明已经有线程获得了【写锁】,无法加锁。
image-20220908175533788
  1. thread_1 线程成功后加锁后, thread_2 线程想获得【读锁】。流程如下
  • tryAcquireShared() 中,方法返回一个整数 int,返回 -1 时表示获取读锁失败;返回 1 表示成功。

  • 如果 state低 16 bits 不为 0,即 state = 0_n,则说明已经有线程获得了【写锁】。再检查 Owner 是否是当前线程,如果是,则可以加读锁,如果不是,则无法加锁,返回 -1

    image-20220908175337233
  • 如果 tryAcquireShared() 返回 -1,即尝试加读锁失败,则进入 doAcquireShared()

    • 将当前线程成装为 Node.SHARED,并加入同步器的等待队列中;会自旋尝试获得锁;
    • 更新等待队列中节点的 waitStatus 值,-1 为有义务唤醒后继节点;
    • 将当前节点 park
    截屏2022-09-08 18.16.09
(二)thread_3 读锁 , thread_4 写锁

此时又有 thread_3 想获取读锁 , thread_4 想获取写锁,因为 thread_1 并没有释放锁,所以它们都被 park 在等待队列。如下图,

image-20220908181443138

解锁流程

【写锁】解锁的部分源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0) // 如果head的引用不为空,
// 且其waitStatus不为0时
unparkSuccessor(h); // 唤醒该节点的后继节点所关联的线程
// 此时Head后继节点关联的线程被唤醒,追踪:
// 读锁:doAcquireShared的for循环中
// 写锁:acquireQueued的for循环中
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // state低十六位:减1
boolean free = exclusiveCount(nextc) == 0; // state低十六位是否为0
if (free) // 若为0
setExclusiveOwnerThread(null); // Owner = null
setState(nextc); // 若不为0,更新重入计数
return free;
}

【读锁】解锁的部分源码如下:

1
2
3
4
5
public static class ReadLock implements Lock, java.io.Serializable {
public void unlock() {
sync.releaseShared(1);
}
}
1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 重入计数nextc是否为0
doReleaseShared();
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 只能将读锁全部释放完
int c = getState();
int nextc = c - SHARED_UNIT; // 重入计数nextc = state高十六位:减1
if (compareAndSetState(c, nextc)) // 尝试CAS将c设置成更新后的nextc
return nextc == 0; // 返回重入计数nextc是否为0
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 判断等待队列非空
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 判断Node(null)节点的waitStatus是否为-1?
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 将Node(null)节点的waitStatus改为0
// 目的是为了防止其他线程一起unpark,从而出错
continue;
unparkSuccessor(h); // unpark Node(null)节点的后继节点
// 此时Head后继节点关联的线程被唤醒,追踪:
// 读锁:doAcquireShared的for循环中
// 写锁:acquireQueued的for循环中
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
(一)thread_1 释放写锁
  1. thread_1 尝试释放写锁 tryRelease(),将 state低16 bits 减 1。若减为 0,则将 Owner 置为空;若不为 0,则说明存在**写锁重入**;更新 state

    image-20220908192405308
  2. 接下来开始唤醒等待队列中的线程,即让 thread_2 恢复运行。此时 thread_2doAcquireShared()for(;;){} 循环中 parkAndCheckInterrupt() 处继续运行:

    • 再执行一轮 for(;;){} 循环,当执行到 tryAcquireShared() 时,state高16 bits 加 1;如下图

      image-20220908193656968
    • thread_2 继续运行,调用 setHeadAndPropagate(node, 1),将 thread_2 的节点(node)从队列里移出,更新 waitStatus 状态;如下图

    image-20220908194459930
    • 【⚠️ 此时唤醒并没有结束】检查 Node(null) 的后继节点的属性,如果是 Node.SHARED,会继续唤醒其后继节点,直到不为 SHARED;被唤醒的线程仍然从 park 的地方开始运行(步骤2),如此一来就会连续唤醒多个相连的 Node.SHARED;如下图

      image-20220908195341408
(二)thread_2, thread_3 释放读锁
  1. thread_2 首先解锁:thread_2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1),使 state 高十六位减 1,但此时 state 高十六位并不为 0;此时就完成了一个读锁的释放。

    image-20220908222320760
  2. thread_3 解锁:与上一个流程一致。thread_3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1),使 state 高十六位减 1,但此时 state 高十六位为 0,此时就进入 deReleaseShared(1)

    image-20220908223005625
    • thread_3 进入 deReleaseShared(1) 中,检查 Node(null)waitStatus 状态是否为 -1,若是,将其改为 0,并唤醒他的后继节点;

      image-20220908223614489
    • thread_4 在之前 park 的地方被唤醒(acquireQueued()for(;;){} 循环中)

  3. thread_4 被唤醒,按照之前的流程尝试加锁。

    image-20220908224221269