本文主要通过以下几个方面来系统的介绍 Java 并发多线程编程:

  1. 线程与进程
  2. Java 线程
  3. 共享模型之 Monitor
  4. 共享模型之 JMM
  5. 共享模型之 无锁(乐观锁)
  6. 并发工具之 线程池

【参考资料】:

视频课程: BiliBili - 黑马程序员Java并发编程 JUC

笔记参考: Java并发 - Nyima’s Blog

  • 注:本课程笔记基于 JDK8,采用 lsf4j 打印日志

1 线程与进程

1.1 进程

  • 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
  • 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
  • 进程就可以视为程序的一个实例(程序是静态的,进程是动态的)。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)

1.2 线程

  • 一个进程可以分为一到多个线程。
  • 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行 。
  • Java 中,线程作为最小调度单位(执行指令),进程作为资源分配的最小单位。 在 Windows 中进程是不活动的,只是作 为线程的容器

1.3 两者对比

  • 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
  • 进程拥有共享的资源,如内存空间等,供其内部的线程共享
    • 进程间通信较为复杂 同一台计算机的进程通信称为 IPC(Inter-process communication)
    • 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
  • 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
  • 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
1.3.1 进程和线程的切换
上下文切换

内核为每一个进程维持一个上下文。**上下文就是内核重新启动一个被抢占的进程所需的状态。**包括以下内容:

  • 通用目的寄存器
  • 浮点寄存器
  • 程序计数器
  • 用户栈
  • 状态寄存器
  • 内核栈
  • 各种内核数据结构:比如描绘地址空间的页表,包含有关当前进程信息的进程表,以及包含进程已打开文件的信息的文件表
进程切换和线程切换的主要区别

最主要的一个区别在于进程切换涉及虚拟地址空间的切换而线程不会。因为每个进程都有自己的虚拟地址空间,而线程是共享所在进程的虚拟地址空间的,因此同一个进程中的线程进行线程切换时不涉及虚拟地址空间的转换

页表查找是一个很慢的过程,因此通常使用cache来缓存常用的地址映射,这样可以加速页表查找,这个cache就是快表TLB(translation Lookaside Buffer,用来加速页表查找)。由于每个进程都有自己的虚拟地址空间,那么显然每个进程都有自己的页表,那么当进程切换后页表也要进行切换,页表切换后TLB就失效了,cache失效导致命中率降低,那么虚拟地址转换为物理地址就会变慢,表现出来的就是程序运行会变慢,而线程切换则不会导致TLB失效,因为线程线程无需切换地址空间,因此我们通常说线程切换要比较进程切换快

而且还可能出现缺页中断,这就需要操作系统将需要的内容调入内存中,若内存已满则还需要将不用的内容调出内存,这也需要花费时间

为什么TLB能加快访问速度

快表可以避免每次都对页号进行地址的有效性判断。快表中保存了对应的物理块号,可以直接计算出物理地址,无需再进行有效性检查

1.4 并行与并发

  1. 单核cpu 下,线程实际还是串行执行的。操作系统中有一个组件叫做任务调度器,将cpu 的时间片(windows 下时间片最小约为 15 毫秒)分给不同的线程使用,只是由于 cpu在线程间(时间片很短)的切换非常快,人类感觉是同时运行的。总结为一句话就是:“微观串行,宏观并行
    一般会将这种线程轮流使用 CPU的做法称为并发,Concurrent。

  2. 多核cpu下,每个核心都可以调度运行线程,此时线程间是可以并行(Parallel)的。

并发(Concurrent) 是一个CPU在不同的时间去不同线程中执行指令。

并行(Parallel)是多个CPU同时处理不同的线程。

1.5 应用

1.5.1 应用之异步调用(案例1)

调用方角度来讲,如果

  • 需要等待结果返回,才能继续运行就是同步
  • 不需要等待结果返回,就能继续运行就是异步

注意:同步在多线程中还有另外一层意思,即让多个线程步调一致

1) 设计

多线程可以让方法执行变为异步的(即不要巴巴干等着)比如说读取磁盘文件时,假设读取操作花费了 5 秒钟,如果没有线程调度机制,这 5 秒 cpu 什么都做不了,其它代码都得暂停…

一般来讲,大文件的读写、耗时较长的工作可以异步执行

2) 结论
  • 比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程
  • tomcat 的异步 servlet 也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞
  • tomcat 的工作线程 ui 程序中,开线程进行其他操作,避免阻塞 ui 线程
1.5.2 应用之提高效率(案例1)

充分利用多核 cpu 的优势,提高运行效率。想象下面的场景,执行3个计算,最后将计算结果汇总。

1
2
3
4
计算 1 花费	10ms
计算 2 花费 11ms
计算 3 花费 9ms
汇总需要 1ms

如果是串行执行,那么总共花费的时间是 10 + 11 + 9 + 1 = 31ms

  • 但如果是四核 cpu,各个核心分别使用线程1 执行计算1,线程2执行计算2,线程3执行计算3,那么3个线程是并行的,花费时间只取决于最长的那个线程运行的时间,即11ms 。最后加上汇总时间,只会花费12ms

注意:需要在多核cpu 才能提高效率,单核仍然时是轮流执行

结论
  1. 单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用 cpu ,不至于一个线程总占用 cpu,别的线程没法干活
  2. 多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
    • 有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任 务都能拆分(参考后文的【阿姆达尔定律】)
    • 也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
  3. IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 CPU,但需要一 直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化。

2 Java 线程

2.1 创建和运行线程

方法一:直接使用构造器创建一个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
public class CreateThread {
public static void main(String[] args) {
Thread myThread = new Thread() {
@Override
public void run() {
System.out.println("my thread running...");
}
};

myThread.start(); // 启动线程
System.out.println("main thread running...");
}
}
  • 使用继承方式的好处是,在run()方法内获取当前线程直接使用 this 就可以了,无须使用 Thread.currentThread()方法;
  • 不好的地方是 Java 不支持多继承,如果继承了 Thread,那么就不能再继承其他类。另外任务与代码没有分离,当多个线程执行一样的任务时需要多份任务代码
方法二:使用 Runnable 配合 Thread(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Test2 {
public static void main(String[] args) {
//创建线程任务
Runnable runnableInstance = new Runnable() {
@Override
public void run() {
System.out.println("Runnable running");
}
};

//将Runnable对象传给Thread
Thread t = new Thread(runnableInstannce);

//启动线程
t.start();
}
}

通过实现 Runnable 接口,并且实现 run()方法。好处是将任务和线程分离,更为灵活。在创建线程时作为参数传入该类的实例即可。

方法二的简化:使用lambda表达式简化操作

只有一个抽象方法的接口可以用 @FunctionalInterface 注解。当有该注解时,可以使用 lambda 来简化操作。所以方法二中的代码可以被简化为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Test2 {
public static void main(String[] args) {
//创建线程任务
Runnable r = () -> {
//直接写方法体即可
System.out.println("Runnable running");
System.out.println("Hello Thread");
};

//将Runnable对象传给Thread
Thread t = new Thread(r);

//启动线程
t.start();
}
}

还可以继续简化上述代码:

1
2
3
4
5
6
7
public class Test3 {
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("Hello Thread!");
},"name_of_thread").start();
}
}

在 IDEA 中,可以在 Runnable 上使用Alt + Enter自动转换成 lambda 表达式。

Thread 与 Runnable 的关系

分析 Thread 的源码,理清它与 Runnable 的关系。

【方法一】

  • 首先,Thread 实现了 Runnable 接口,我们可以类比静态代理,将 new Thread 看作是代理对象。
1
public class Thread implements Runnable {...}
  • 在方法 1 中,我们在 new Thread 对象中重写了 run() 方法。

【方法二】

  • 在方法 2 中,Thread 的构造方法如下,会传入一个 Runnable target 对象
1
2
3
4
5
6
7
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

private void init(ThreadGroup g, Runnable target, String name,long stackSize) {
init(g, target, name, stackSize, null, true);
}
  • 而在 Threadrun() 方法中,会先判断 Runnable target 对象是否为空,如果非空,就调用 target.run() 方法,
1
2
3
4
5
public void run() {
if (target != null) {
target.run();
}
}

Runnable 更容易与线程池等高级 API 配合用 Runnable 让任务类脱离了 Thread 继承体系,更灵活。Java 推荐组合优于继承

方法三:使用 FutureTaskThread 结合

分析 FutureTask 源码我们可以知道, FutureTask 间接实现了 RunnableFuture 接口,所以他也可以像方法二一样创建一个线程。 Future 接口的 get() 方法可以用来返回任务的执行结果。

1
public class FutureTask<V> implements RunnableFuture<V> {...}
1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

FutureTask 接收返回值

此外, Runnablerun() 方法没有返回值,这样就没有办法在两个线程之间传递返回值。所以 FutureTask 可以与 Callable 接口结合使用,就可以将任务的结果转递给其他线程

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

使用 FutureTask 可以用泛型指定线程的返回值类型(Runnablerun() 方法没有返回值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//需要传入一个 Callable 对象,使得任务结束时可以返回值
FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("线程执行!");
Thread.sleep(1000);
return 100;
}
});

Thread r1 = new Thread(task, "t2");
r1.start();

//获取线程中方法执行后的返回结果
int res = task.get() // 阻塞在此处等待结果的返回
System.out.println(res);
}
}

【总结】

使用继承方式的好处是方便传参,你可以在子类里面添加成员变量,通过 set() 方法设置参数或者通过构造函数进行传递,而如果使用 Runnable 方式,则只能使用主线程里面被声明为 final 的变量。

不好的地方是 Java 不支持多继承,如果继承了 Thread 类,那么子类不能再继承其他类,而 Runable 则没有这个限制。前两种方式都没办法拿到任务的返回结果,但是 FutureTask 方式可以

2.2 原理之线程运行

2.2.1 栈与栈帧

Java Virtual Machine Stacks (Java 虚拟机栈) 我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?

  • 栈内存 \to 线程。其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存
  • 栈帧 \to 调用方法。每个栈由多个栈帧 (Frame) 组成,对应着每次方法调用时所占用的内存
  • 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
JVM 工作流程:

当我们运行一个程序(类)时,

  1. 我们先执行【类加载】,把该类的字节码放入方法区;
  2. JVM 启动【主线程】,并且为其分配一个【主线程栈】内存空间;
  3. 从【主方法】开始,JVM 为主方法在主线程栈内分配一个【主方法栈帧】内存;
  4. 而在这个【主方法栈帧】内存中,存在着【局部变量表】【操作数栈】【锁记录】等
  5. 【局部变量表】用来存储方法内的局部变量;
  6. 当我们调用其他方法时,JVM 会为其在【主线程栈】中分配一块新的【栈帧】内存;
  7. 当我们【new】一个对象时,这个对象会被存储在【堆】,并将该对象的【引用】地址存入相应的地方(局部变量表等)
  8. 当我们创建一个线程时,JVM 会为其分配一个新的【栈】内存空间,不同的栈内存互不干扰。
2.2.2 线程上下文切换

因为以下一些原因导致 CPU 不再执行当前的线程,转而执行另一个线程的代码

【被动】

  • 线程的 CPU 时间片用完
  • 垃圾回收(“Stop the world”)
  • 有更高优先级的线程需要运行

【主动】

  • 线程自己调用了 sleepyieldwaitjoinparksynchronizedlock 等方法

当【上下文切换】发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念就是程序计数器(Program Counter Register),它的作用是记住下一条 JVM 指令的执行地址,是线程私有的

  • 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
  • 上下文切换频繁发生会影响性能

2.3 常见方法

方法名 功能说明 注意
start() 启动一个新线程,在新线程上运行run()方法中的代码 start() 方法只是让线程进入【就绪状态】,里面代码不一定立刻运行有可能 CPU 的时间片还没分给它)。每个线程对象的**start()方法只能调用一次**,如果调用了多次会出现 IllegalThreadStateException
run() 新线程启动后会调用的方法 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的run() 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象,来覆盖默认行为
join() 阻塞等待线程运行结束
join(long n) 阻塞等待线程运行结束最多等待 n 毫秒
getId() 获取线程长整型的id· |id` 唯一
getName() 获取线程名
setName(String) 修改线程名
getPriority() 获取线程优先级
setPriority(int) 设置线程优先级 java中规定线程优先级是 1~10 的整数,较大的优先级能提高该线程被 CPU 调度的机率
getState() 获取线程状态 Java 中线程状态使用6个 enum 表示: NEWRUNNABLEBLOCKEDWATINGTIMED_WAITINGTERMINATED
isInterrupted() 判断是否被打断 不会清除打断标记
isAlive() 线程是否存活(还没有运行完毕)
interrupt() 打断线程 如果被打断线程正在 sleep, wait, join 会导致被打断的线程抛出 InterruptedException,并清除打断标记;如果打断的正在运行的线程,则会设置打断标记park 的线程被打断,也会设置打断标记
currentThread() 获取当前正在执行的线程
sleep(long n) 让当前执行的线程休眠 n 毫秒,休眠时让出 CPU 的时间片给其它线程 不会释放操作
yield() 提示线程调度器让出当前线程对 CPU 的使用 主要是为了测试和调试
2.3.1 start() vs run()
  • start() 用来启动线程, run() 是线程启动之后要执行的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Test4 {
public static void main(String[] args) {
Thread t1 = new Thread("t1") {
@Override
public void run() {
System.out.println("RUNNING...");
}
}

// 不启动t1线程就直接run()
t1.run();
System.out.println(t1.getState());
}
}
1
RUNNING...

我们可以看到,貌似没有 start() 来启动 t1 线程, run() 也成功执行了。但是我们需要注意,这里的 run() 方法其实是主线程执行的。

被创建的 Thread 对象不启动直接调用重写的 run() 方法时, run() 方法是在主线程中被执行的,而不是在我们所创建的线程中执行。

所以如果想要在所创建的线程中执行 run() 方法,需要使用 Thread 对象的 start() 方法。

2.3.2 sleep() vs yield()
  • sleep() :当前线程从 Running 运行状态 \to Timed Waiting 阻塞状态 \stackrel{一段时间}{\longrightarrow} Runnable 就绪状态
  • yield():当前线程从 Running 运行状态 \to Runnable 就绪状态

个人理解:

sleep() 更偏向于阻塞一段时间,之后等待 CPU 执行到它再进入就绪状态;而 yield() 是让出当前这一次执行,进入就绪状态,不影响下一次执行。

sleep() 使线程阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Test5 {
public static void main(String[] args) {
Thread t1 = new Thread("t1") {
@Override
public void run() {
try {
System.out.println("t1开始sleep");
Thread.sleep(2000);
System.out.println("t1结束sleep");
} catch (InterruptedException e) {
System.out.println("t1被唤醒");
e.printStackTrace();
}
}
};
t1.strat();

Thread.sleep(1000);
System.out.println(t1.getState());
t1.interrupt(); // 打断t1的sleep
}
}
1
2
3
t1开始sleep
t1: Timed Waiting
t1被唤醒
  1. 调用 sleep() 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),可通过state()方法查看

  2. 其它线程可以使用 interrupt() 方法打断正在睡眠的线程,这时 sleep() 方法会抛出 InterruptedException

  3. 睡眠结束后的线程未必会立刻得到回到 Runnable 状态

  4. 建议用 TimeUnitsleep() 代替 Thread 的 sleep() 来获得更好的可读性 。如:

    1
    2
    3
    4
    5
    6
    //休眠一秒
    TimeUnit.SECONDS.sleep(1);
    Thread.sleep(1*1000);
    //休眠一分钟
    TimeUnit.MINUTES.sleep(1);
    Thread.sleep(1*60*1000);
yield() 让出当前线程
  1. 调用 yield() 会让当前线程从 Running 运行状态进入 Runnable 就绪状态(仍然有可能被执行),然后调度执行其它线程
  2. 具体的实现依赖于操作系统的【任务调度器】
线程优先级
  • 线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它

  • 如果 CPU 比较忙,那么优先级高的线程会获得更多的时间片,但 CPU 闲时,优先级几乎没作用

  • 设置方法:

    1
    thread1.setPriority(Thread.MAX_PRIORITY); //设置为优先级int(1~10)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
Runnable task1 = () -> {
int count = 0;
while(true) {
System.out.println("----> task1" + count++);
}
};

Runnable task2 = () -> {
int count = 0;
while(true){
Thread.yield();
System.out.println("===== task2" + count++);
}
};

Thread t1 = new Thread(task1, "t1");
Thread t2 = new Thread(task1, "t2");

t1.start();
t2.start();
}

/*
输出结果应为 t1打印的count 远大于 t2打印的count
*/
2.3.3 join()

我们先分析如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test5 {
static int res = 0;
public static void main(String[] args) {
Thread t1 = new Thread("t1") {
@Override
public void run() {
try {
System.out.println("t1开始sleep");
Thread.sleep(1000);
System.out.println("t1结束sleep");
res = 10;
}
}
};

t1.strat();
System.out.println(res);
}
}
1
0

由于两个线程的执行次序与执行时间(操作系统线程调度)并不确定,所以我们不能够确定 res 取值。我们可以用 join() 方法等待其他线程的结束。流程图如下

graph TD
A1[main]--瞬间完成-->E1[r=0]
E1 --> B1[t1.join]
A1 --> C1[t1.start]
C1 --1s后---> D1[r=10]
D1 --t1结束--> B1
B1 --> R1[res: r=10]

join()的底层原理就是 wait()

用于等待某个线程结束。哪个线程内调用 join() 方法,就等待哪个线程结束,然后再去执行其他线程。

如在主线程中调用 ti.join(),则是主线程等待 t1 线程结束。

1
2
3
4
5
Thread thread = new Thread();
//等待thread线程执行结束
thread.join();
//最多等待1000ms,如果1000ms内线程执行完毕,则会直接执行下面的语句,不会等够1000ms
thread.join(1000);
2.3.4 interrupt()

用于打断阻塞 (sleep()wait()join() …) 以及正常运行的线程。 处于阻塞状态的线程,CPU 不会给其分配时间片。

  • 如果一个线程在在运行中被打断,打断标记会被置为 true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test6 {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread("t1") {
@Override
public void run() {
System.out.println("t1开始运行");
while(true){
if (this.isInterrupted()) {
break;
}
}
}
};
t1.start();

Thread.sleep(1000);
t1.interrupt(); // 打断t1的sleep
System.out.println("t1的打断标记:" + t1.isInterrupted());
}
1
2
t1开始运行
t1的打断标记:true
  • 如果是打断因sleep(), wait(), join()方法而被阻塞的线程,会将打断标记会被清空,置为 false
1
2
//用于查看打断标记,返回值被boolean类型
t1.isInterrupted();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Test6 {
public static void main(String[] args) {
Thread t1 = new Thread("t1") {
@Override
public void run() {
try {
System.out.println("t1开始sleep");
Thread.sleep(5000);
System.out.println("t1结束sleep");
} catch (InterruptedException e) {
System.out.println("t1被打断");
e.printStackTrace();
}
}
};
t1.strat();

Thread.sleep(1000);
t1.interrupt(); // 打断t1的sleep
System.out.println("t1的打断标记:" + t1.isInterrupted());
}
}
1
2
3
t1开始sleep
t1被打断
t1的打断标记:false

正常运行的线程在被打断后,不会停止,会继续执行。如果要让线程在被打断后停下来,需要使用打断标记来判断

1
2
3
4
5
while(true) {
if(Thread.currentThread().isInterrupted()) {
break;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void main(String[] args) throw InterruptedException{
Thread t1 = new Thread(() -> {
while(true) {
if(Thread.currentThread().isInterrupted()) {
break;
}
}
}, "t1");

t1.start();

Thread.sleep(1000);
Log.debug("interrupted");
t1.interrupt();
}
interrupt() 的应用——两阶段终止模式

当我们在执行线程一时,想要终止线程二,这是就需要使用 interrupt() 方法来优雅的停止线程二(料理后事: 释放锁和临界资源等)。

【错误思路】

  • stop():该方法会强制杀死线程,该方法虽然能在进程结束后立即释放锁对象,但是这种方法会破坏 run() 代码块的原子性(synchronized 保护其原子性),造成程序的逻辑错误(运行一半被突然恶意打断)。
  • suspend():暂停线程(挂起)和 resume()恢复线程,若线程对临界区资源上锁,则永远解锁,其他线程也无法获取锁。
  • System.exit(int):该方法会直接停止进程

【两阶段终止模式】

graph TD

A["while(true)"]
B{是否被打断?}
C[料理后事]
D[睡眠2s]
E(结束循环)
F[执行监控记录]
G["设置打断标记 = true"]

A --> B
B --yes--> C
B --no--> D
C --> E
D --无异常--> F
D --有异常--> G
F --> A
G --> A
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*  两阶段终止模式(Two Phase Termination):在进程T1中终止进程T2
在终止进程T2之前让T2释放锁和临界资源
不用stop() 和 System.exit()
*/
class TwoPhaseTermination {
// 监控线程
privite Thread monitor;

// 启动监控线程
public void start() {
monitor = new Thread (() -> {
while (true) {
Thread current = Thread.currentThread();
if (current.isInterrupted()) {
// TODO 释放锁和临界资源
System.out.println("释放锁和临界资源");
break;
}
try {
Thread.sleep(1000);
//TODO 正常功能的代码块
} catch (InterruptedException e) {
e.printStackTrace();
// 若sleep时被打断,会捕获错误e,此时的isInterrupted标记为false,程序会继续执行。所以有以下操作
current.interrupt(); // 重新设置isInterrupted打断标记, true -> false
}
}
});
monitor.start();
}

// 停止监控线程
public void stop() {
monitor.interrupt();
}
}
2.3.5 主线程与守护线程
  • 主线程: 当 JAVA 进程中有多个线程在执行时,只有当所有非守护线程都执行完毕后,JAVA进程才会结束。

  • 守护线程: 但当非守护线程全部执行完毕后,守护线程无论是否执行完毕,也会一同结束。(垃圾回收线程)

1
2
//将线程设置为守护线程, 默认为false
Thread.setDaemon(true);

2.4 线程的状态

2.4.1 五种状态(操作系统层面)
20200608144606
  1. 【初始状态】:在语言层面创建了线程对象,但未与操作系统线程关联
  2. 【可运行状态】:(就需状态),已与操作系统线程关联,可由 cpu 调度执行
  3. 【运行状态】:获得 cpu 时间片,正在执行。
    • 当 cpu 分配的时间片用完,由【运行状态】转为【可运行状态】,导致线程的上下文切换
  4. 【阻塞状态】:该状态下的线程不会占用 cpu ,会导致线程的上下文切换
    • 等阻塞操作结束,系统唤醒阻塞状态,切换至【可运行状态】
  5. 【终止状态】:表示线程已经执行完毕,线程不会再转换成其他状态
2.4.2 六种状态(JAVA API 层面)
20200608144621

详见第三章 3.9【线程状态转换】

  1. NEW】:线程被创建,还没有调用strat()方法

  2. RUNNABLE】:调用了start()方法后。该状态涵盖了操作系统层面的*【可运行状态】、【运行状态】和【阻塞状态】*

  3. 三种“Java 中的阻塞状态”:后面会在状态转换一节详述。

    (1) 【BLOCKED】:synchronized 等待获得锁时

    (2)【WAITING】:join()

    (3)【TIMED_WAITING】:sleep()

  4. TERMINATED】:代码运行结束

2.5 习题

2.5.1 统筹方法(烧水泡茶)

阅读华罗庚《统筹方法》,给出烧水泡茶的多线程解决方案,提示:

  • 参考图二,用两个线程(两个人协作)模拟烧水泡茶过程
    • 文中办法乙、丙都相当于任务串行
    • 而图一相当于启动了 4 个线程,有点浪费
  • sleep(n)模拟洗茶壶、洗水壶等耗费的时间

附:华罗庚《统筹方法》


统筹方法,是一种安排工作进程的数学方法。它的实用范围极广泛,在企业管理和基本建设中,以及关系复杂的科研项目的组织与管理中,都可以应用。

怎样应用呢?主要是把工序安排好。

比如,想泡壶茶喝。当时的情况是:开水没有;水壶要洗,茶壶、茶杯要洗;火已生了,茶叶也有了。
怎么办?

  • 办法甲:洗好水壶,灌上凉水,放在火上;在等待水开的时间里,洗茶壶、洗茶杯、 拿茶叶;等水开了,泡茶喝。
  • 办法乙:先做好一些准备工作,洗水壶,洗茶壶茶杯,拿茶叶;一切就绪,灌水烧水;坐待水开了,泡茶喝。
  • 办法丙:洗净水壶,灌上凉水,放在火上,坐待水开;水开了之后,急急忙忙找茶叶,洗茶壶茶杯,泡茶喝。

哪一种办法省时间?我们能一眼看出,第一种办法好,后两种办法都窝了工。

这是小事,但这是引子,可以引出生产管理等方面有用的方法来。

水壶不洗,不能烧开水,因而洗水壶是烧开水的前提。没开水、没茶叶、不洗茶壶茶杯,就不能泡茶,因而这些又是泡茶的前提。它们的相互关系,可以用下图来表示:

graph LR;
A[洗水壶 1min]--> B[烧开水 15mins];
B--> R[泡茶];
C[洗茶壶 1min]--> R;
D[洗茶杯 1min]--> R;
E[拿茶叶 1min]--> R;

从这个图上可以一眼看出,办法甲总共要16分钟(而办法乙、丙需要20分钟)。如果要缩短工时.提高工作效率,应当主要抓烧开水这个环节,而不是抓拿茶叶等环节。同时,洗茶壶茶杯、拿茶叶总共不过3分钟,大可利用“等水开”的时间来做。

是的,这好像是废话,卑之无甚高论。有但稍有变化,临事而迷的情况,常常是存任的。在近代工业的错综夏杂的工艺过程中,往往就不是像泡茶喝这么简单了。任务多了,几百几千,甚至有好几万个任务。关系多了,错综复杂,千头万绪,往往出现 “万事俱备,只欠东风” 的情况。由于一两个零件没完成,耽误了一台复杂机器的出厂时间。或往往因为抓的不是关键,连夜三班,急急忙忙,完成这一环节之后,还得等待旁的环节才能装配。

洗茶壶,洗茶杯,拿茶叶,或先或后,关系不大,而且同是一个人的活儿,因币可以合并成为:

graph LR;
A[洗水壶 1min]--> B[烧开水 15mins];
B--> R[泡茶];
C[洗茶壶, 洗茶杯, 拿茶叶 3mins]--> R;

看来这是“小题大做”,但在工作环节太多的时候,这样做就非常必要了。

这里讲的主要是时间方面的事,但在具体生产实践中,还有其他方面的许多事。这种方法里然不一定能直接解决所有问题,但是,我们利用这种方法来考虑问题,也是不无裨益的。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
pubilc static main(String[] args) {
Thread t1 = new Thread(() -> {
Log.debug("洗水壶");
Thread.sleep(1000);
Log.debug("烧开水");
Thread.sleep(15000);
}, "小明");

Thread t2 = new Thread(() -> {
Log.debug("洗茶壶");
Thread.sleep(1000);
Log.debug("洗茶杯");
Thread.sleep(1000);
Log.debug("拿茶叶");
Thread.sleep(1000);
try {
t1.join(); // 由 小王 来泡茶
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "小王");

t1.start();
t2.start();
}

输出:

1
2
3
4
5
6
19:19:37.547	[小明]	c.TestMakeTea -	洗茶壶
19:19:37.547 [小王] c.TestMakeTea - 洗水壶
19:19:38.552 [小明] c.TestMakeTea - 洗茶杯
19:19:38.552 [小王] c.TestMakeTea - 烧开水
19:19:49.553 [小明] c.TestMakeTea - 拿茶叶
19:19:53.553 [小明] c.TestMakeTea - 泡茶

解法1的缺陷:

  • 上面模拟的是小明等小王的水烧开了,小明泡茶,如果反过来要实现小王等小明的茶叶拿来了,小王泡茶呢?代码最好能适应两种情况
  • 上面的两个线程其实是各执行各的,如果要模拟小王把水壶交给小明泡茶,或模拟小明把茶叶交给小王泡茶

2.6 小结

本章的重点在于掌握

  • 线程的四种创建方式:
    1. 使用构造器 new Thread(),重写其中的 run() 方法
    2. 重写 Runnable 接口的 run() 方法,再将其(任务)作为参数传入 new Thread()。线程和任务可以分离。
    3. 重写 Callable 接口的 V call() 方法,再将 FutureTask<>(new Callable) 任务作为参数传入 new Thread()。这样不仅可以实现线程和任务的分离,还可以返回线程的结果。
    4. 使用线程池。
  • 线程重要api, 如start, run, sleep, join, interrupt
  • 线程状态:操作系统的五种,Java 层面的六种
  • 应用方面
    • 异步调用:主线程执行期间,其它线程异步执行耗时操作
    • 提高效率:并行计算,缩短运算时间
    • 同步等待:join
    • 统筹规划:合理使用线程,得到最优效果
  • 原理方面
    • 线程运行流程:栈、栈帧、上下文切换、程序计数器
    • Thread 三种创建方式的源码
  • 模式方面
    • 两阶段终止

3 共享模型之 Monitor

3.1 共享带来的问题

Java 代码中的体现

两个线程对初始值为 0 的全局静态变量。一个做自增操作, 一个做自减操作,各执行5000次, 最终的结果是0吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int counter = 0;

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
for (int i = 0; i<5000; i++){
counter++;
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int i = 0; i<5000; i++){
counter--;
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();

Log.debug("{}", counter);
}

测试结果大概率不为 0

sequenceDiagram
    participant 线程1
    participant 线程2
    participant 静态变量 i
    
    静态变量 i ->> 线程2 :getstatic i 读取0
    线程2 ->> 线程2 :iconst_1 准备常数 1
    线程2 ->> 线程2 :isub 减法, 线程内 i= -1
    线程2 -x 静态变量 i :!!!还没来得及将i的结果写入
    线程2 -->> 线程1 :上下文切换
    线程1 ->> 线程1 :iconst_1 准备常数 1
    线程1 ->> 线程1 :iadd 加法, 线程内 i= 1
    线程1 ->> 静态变量 i :putstatic i 写入 1
    线程1 -->> 线程2 :上下文切换
    线程2 ->> 静态变量 i :putstatic i 写入 -1

最终结果为 -1.

3.1.1 临界区 Critical Section
  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源
    • 多个线程读共享资源其实也没有问题
    • 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
  • 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区

例如,下面代码中的临界区

1
2
3
4
5
6
7
8
9
10
11
12
13
static int counter = 0; // 临界资源

static void increment()
// 临界区
{
counter++;
}

static void decrement()
// 临界区
{
counter--;
}
3.1.2 竞态条件 Race Condition

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

3.2 Synchronized 解决方案

3.2.1 解决手段

为了避免临界区的竞态条件发生,有多种手段可以达到目的。

  • 【阻塞式】的解决方案:synchronizedLock
  • 【非阻塞式】的解决方案:原子变量

本次课使用阻塞式的解决方案:synchronized,来解决上述问题,即俗称的**【对象锁】**,它采用互斥的方式让同一 时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住(blocked)。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。

3.2.2 Synchronized 语法
1
2
3
synchronized(Object) { // 例如, 当线程1持有锁时, 线程2被阻塞; 等待线程1释放锁
//临界区
}

举例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int counter = 0; 
//创建一个公共对象,作为对象锁的对象
static final Object room = new Object();

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) { // 上锁
counter++;
}
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) { // 上锁
counter--;
}
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}

程序流程如下图:

sequenceDiagram
    participant 线程1
    participant 线程2
    participant 静态变量 i
    participant 🔒锁对象
    
    线程2 ->> 🔒锁对象 :尝试获取锁
    Note over 🔒锁对象,线程2 :拥有锁
    静态变量 i ->> 线程2 :getstatic i 读取0
    线程2 ->> 线程2 :iconst_1 准备常数 1
    线程2 ->> 线程2 :isub 减法, 线程内 i= 0-1 = -1
    线程2 -->> 线程1 :上下文切换
    线程1 ->> 🔒锁对象 :尝试获取锁, 但是被阻塞(BLOCKED)
    线程1 -->> 线程2 :上下文切换
    线程2 ->> 静态变量 i :putstatic i 写入 -1
    Note over 🔒锁对象,线程2 :拥有锁
    线程2 ->> 🔒锁对象 :释放锁, 并唤醒阻塞的线程
    Note over 🔒锁对象,线程1 :拥有锁
    静态变量 i ->> 线程1 :getstatic i 读取-1
    线程1 ->> 线程1 :iconst_1 准备常数 1
    线程1 ->> 线程1 :iadd 加法, 线程内 i= -1+1 = 0
    线程1 ->> 静态变量 i :putstatic i 写入 0
    Note over 🔒锁对象,线程1 :拥有锁
    线程1 ->> 🔒锁对象 :释放锁, 并唤醒阻塞的线程

【思考】

synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

将上一个代码块的例子解耦,使其资源类和任务类分开:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Room {
private int counter = 0;

public void increment() {
synchronized (this) {
counter++;
}
}

public void decrement() {
synchronized (this) {
counter--;
}
}

public int getCounter() {
synchronized (this) {
return counter;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.increment();
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.decrement();
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",room.getCounter());
}
3.2.3 Synchronized 在方法上

我们必须知道, Synchronized 锁住的始终是对象. 以下两个例子看似是与方法有关, 实则依然是为对象加锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
// synchronized 加在成员方法上
class Test{
public synchronized void test() {
}
}

//等价于
class Test{
public void test() {
synchronized(this) { // 锁住 this 对象
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
// synchronized 加在静态方法上
class Test{
public synchronized static void test() {
}
}

//等价于
class Test{
public static void test() {
synchronized(Test.class) { // 锁住 Test 类对象
}
}
}

注意:

  • sleep() 方法不会释放 synchronized 锁。

3.3 变量的线程安全分析

3.3.1 成员变量和静态变量的线程安全分析
  • 如果没有变量没有在线程间共享,那么变量是安全的;
  • 如果变量在线程间共享
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全
3.3.2 局部变量线程安全分析
  • 局部变量被初始化为【基本数据类型】是安全的;

  • 局部变量引用的对象 【未必】是安全的;

    • 如果局部变量引用的对象没有引用线程共享的对象,那么是线程安全的;
    • 如果局部变量引用的对象引用了一个线程共享的对象,那么要考虑线程安全
  • 局部变量是线程安全的——每个方法都在对应线程的栈中创建栈帧,不会被其他线程共享。

20200608144636
  • 如果局部变量引用的对象被共享,且执行了读写操作,则线程不安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ClassUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for(int i=0; i<loopNumber; i++) {
method2();
method3();
}
}

private void method2() {
list.add("1");
}

private void method3() {
list.remove(0);
}
}
1
2
3
4
5
6
7
8
9
10
public class test {
public static void main(String[] args) throws InterruptedException {
ClassUnsafe test = new ClassUnsafe();
for(int i=0; i<2; i++){
new Thread(() -> {
test.method1(200);
},"thread "+i).start();
}
}
}

该程序的在运行过程中有可能会报错。因为多个线程会访问同一个 ArrayList<String> list 对象。

20200608144649
  • 如果是局部变量,则会在中创建对应的对象,不会存在线程安全问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ClassSafe {
public void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for(int i=0; i<loopNumber; i++) {
method2(list);
method3(list);
}
}

private void method2(ArrayList<String> list) {
list.add("1");
}

private void method3(ArrayList<String> list) {
list.remove(0);
}
}
20200608144702
3.3.3 常见线程安全类
  • String
  • Integer
  • StringBuffer
  • Random
  • Vector
  • Hashtable
  • java.util.concurrent 包下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。也可以理解为

  • 它们的每个方法是原子的
  • 但多个方法组合在一起就不是原子的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Hashtable table = new Hashtable();
// thread1, thread2
new Thread(()->{
if (table.get("key") == null) {
table.put("key",value1)
}
}).start();

new Thread(()->{
if (table.get("key") == null) {
table.put("key",value2)
}
}).start();

sequenceDiagram
    participant 线程1
    participant 线程2
    participant table
    
    table ->> 线程1 :get("key") == null
    table ->> 线程2 :get("key") == null
    线程2 ->> table :put("key", value2)
    线程1 ->> table :put("key", value1)
3.3.4 不可变类线程安全性

StringInteger 等都是不可变类,因为其内部的属性不可以改变,因此它们的方法都是线程安全的

思考:

Stringreplace()substring() 等方法可以改变值,那么这些方法又是如何保证线程安全的呢?

这是因为这些方法的返回值都创建了一个新的对象,而不是直接改变 StringInteger 对象本身。

3.3.5 习题

找出临界区代码,加锁

  • 买票:BiliBili - 黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程 买票问题
  • 转账:BiliBili - 黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程 转账问题

3.4 Monitor

3.4.0 Java 对象头

注意 ⚠️

个人认为,对于一个对象,当我们通过对象头信息的 【Mark Word区分(关于锁的)不同【对象类型】的时候,我们是通过【最后 2 bits 】来区分不同的锁信息的(NormalBiased 则需要最后 3 bits,因为其后两位都为 01)。

一个 Java 对象由【对象头】和【对象体】组成。以 32 bits 的虚拟机为例,

  • 普通对象:

    image-20220902142218758
    • 普通对象的 Java 对象头由 Mark WordKlass World 组成,一共 64 bits

    • Klass World:用来表示该对象的类型(如 String 等),是一个指针,用来找到类对象;

    • Mark Word 在不同对象类型下的结构如下:

    1. Normal 普通类型标记:hashcode(25 bits) 用来区分对象的哈希码;age(4 bits) GC 垃圾回收时的分代年龄;biased_lock(1 bit) 代表是否是偏向锁;最后 2 bits 代表该对象的加锁状态。
    2. Biased 偏向锁标记
    3. Ligthweight Locked 轻量级锁标记(最后 2 bits : 0 0):ptr_to_lock_record(30 bits) 指向 LockRecord 锁记录对象的指针;最后 2 bits 代表该对象的加锁状态。
    4. Heavyweight Locked 重量级锁标记:ptr_to_heavyweight_monitor(30 bits) 指向 Monitor 的指针;最后 2 bits 代表该对象的加锁状态。
    5. GC 垃圾回收标记:如果该对象已经被 GC 垃圾回收了,则将最后 2 bits 置为 11
    image-20220902142940973
  • 数组对象:数组对象在普通对象 (64 bits) 的基础上增加了 array length(32 bits) 数组长度。

image-20220902142343013
3.4.1 Monitor 原理

2022.09.04 更新:关于 ObjectMonitor 的底层源码分析

Monitor 又称为监视器或管程,,synchronized 实现的底层原理, 由【操作系统】提供

每个 Java 对象都可以关联一个 Monitor 对象:如果使用 synchronized 关键字给对象上锁(重量级)之后,该对象与 Monitor 关联。其中

  • 【上锁】的字节码表示是 monitorenter,将该对象的对象头中的 Mark Word 的前 30 bits 设置为为指向 Monitor 的指针;
  • 【解锁】的字节码表示是 monitorexit,将该 Owner 置为 null,唤醒 EntryList 里等待的线程。

20200608144917

  • 刚开始时, MonitorOwner 为空
  • 当线程执行到临界区代码时,如果使用了 synchronized,会先查询 synchronized 中所指定的对象(obj是否关联了 Monitor
    • 如果没有关联,则会先去去与 Monitor 关联,并且将 Owner 设为当前线程。
    • 如果已经关联,则会去查询该 Monitor 是否已经有了 Owner
      • 如果没有,则 Owner 与将当前线程绑定
      • 如果有,则放入 EntryList,进入阻塞状态(blocked
  • MonitorOwner 将临界区中代码执行完毕后,Owner 便会被清空,此时 EntryList 中处于阻塞状态的线程会被叫醒并竞争,此时的竞争是**【非公平的】**

注意 ⚠️:

  • 对象在使用了 synchronized 后与 Monitor 绑定时,会将对象头中的 Mark Word 的前 30 bits 设置为为指向 Monitor 的指针。
  • 每个对象都会绑定一个【唯一】的 Monitor,如果 synchronized 中所指定的对象(obj)不同,则会绑定不同的 Monitor
3.4.2 轻量级锁

我们之前已经知道,由 synchronized 关键字加锁的对象底层是通过 monitor 这种【重量级锁】实现的。那么当我们为了保证一个对象可以被互斥地访问而采取了 synchronized,但是这个对象在运行中不存在不同线程间竞争的关系。JVM 为此提供了一套对于 synchronized 的优化机制,以减小每次使用重量级锁的系统开销。

注意 ⚠️

个人认为,轻量级锁并不是一种【实体的锁】,而是一系列通过算法调度,优化 Monitor 的系统开销的”策略“。为了与 Monitor 这种重量级锁对应,我们才称其为轻量级锁。

【使用场景】当一个对象被多个线程所访问,但访问的时间是错开的(不存在竞争),此时就可以使用轻量级锁来优化。

因为重量级锁由操作系统提供,使用时系统开销较大,所以在使用 synchronized 时【优先】使用轻量级锁。轻量级锁对使用者是透明的,即语法依然是 【synchronized】, 优先使用轻量级锁,如若失败则改用其他类型。

(1)每次想要访问Object 时,都会创建锁记录 Lock Record对象。每个线程的栈帧都会包含一个锁记录对象,内部可以存储锁定对象的 Mark Word(不再一开始就使用 Monitor);

20200608144942

(2)让锁记录中的 Object reference 指向锁对象 Object,并尝试用 cas去替换 Object 中的 Mark Word,将此 Mark Word 放入 lock record 中保存;

这里提到的 cas 在后面的章节会详细介绍,这里只需了解 cas 操作目的是为了交换【对象头】的 Mark Word 与线程栈中【锁记录】的【地址 + 锁类型】信息。该操作是原子性的。

20200608144950

(3)如果 cas 替换成功,则将 Object***对象头的 Mark Word***替换为锁记录的地址和状态 00(轻量级锁状态),表示由该线程给对象加锁

20200608144957

(4)当一个【线程】想要访问一个加锁 【Object 】对象时,该线程会检查对象头 Mark Word 的最后 2 bits 是不是 01。若是,则为【步骤(3)】中的正常加锁流程;若是 00,则说明已经有线程持有了该 Object 的轻量级锁cas 失败。以下根据线程的不同,分为两种情况讨论:

  • 如果是【其他线程】已经持有了该 Object 的轻量级锁,即对象头中的【锁记录地址】不指向本线程。这时表明有竞争,进入下一小节的【锁膨胀】过程
  • 如果是【本线程】已经持有了该 Object 的轻量级锁,也即加锁 Object 对象头中的锁记录地址指向本线程。当再一次使用这个加锁的 Object 时,新创建一个为地址与状态码为 null的【锁记录】。此时我们称执行了 synchronized 【锁重入】,而【锁记录】的个数就是该线程Object 加锁的次数
image-20220326175731138

(5)当退出 synchronized 【解锁】时,如果有取值为 null 的锁记录,表示有锁重入。这时清除这个记录,表示【重入计数 -1】

20200608144957

(6)当退出 synchronized 【解锁】锁记录的值不为 null 时,这时使用 casMark Word 的值恢复给对象头:

  • 成功,则解锁成功;
  • 失败,说明轻量级锁进行了【锁膨胀】或者已经升级为【重量级锁】,进入重量级锁解锁流程。
3.4.3 锁膨胀

锁膨胀:轻量级锁 \stackrel{升级}{\longrightarrow} 重量级锁的过程。

(1)如果一个线程在给一个对象加轻量级锁时,cas 替换操作失败(因为此时其他线程已经给对象加了轻量级锁,Mark Word 的最后 2 bits00),此时该线程就会进入【锁膨胀】过程:

20200608145004

(2)此时便会给对象加上【重量级锁】(Monitor):

将对象头的 Mark Word 改为 Monitor 的地址,并且状态改为 10,进入【阻塞状态】

20200608145148

(3)当 Thread-0 解锁时,想使用 casMark Word 的值恢复给对象头,而此时的对象头里存放着 Monitor 的地址和状态码 10cas 失败。这时会进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置Ownernull,唤醒 EntryList 中阻塞等待的线程。

3.4.4 自旋优化

重量级锁竞争时,还可以使用自旋来优化(在多核心 cpu 中才有意义),如果当前线程在自旋成功(使用锁的线程退出了同步块,释放了锁,即当前线程可以直接成为新的 Owner),这时就可以避免线程进入阻塞状态。

自旋成功的情况
sequenceDiagram
    participant t1 as 线程1(CPU 0)
    participant obj as synchronized(Object)
    participant t2 as 线程2(CPU 1)
    
    Note over obj :状态码 01 (无锁)
    t1 ->> +obj :访问同步块, 获取 Monitor
    t1 ->> obj :成功加锁
    Note over obj :重量级锁指针 |状态码 10 (重量级锁)
    t2 -x obj :❌ 访问同步块, 获取 Monitor
    t2 ->> t2 :自旋重试
    Note over t1 :执行临界区代码块
    t2 ->> t2 :自旋重试
    obj ->> -t1 :成功解锁
    Note over obj :状态码 01 (无锁)
    t2 ->> +obj :成功加锁
    Note over obj :重量级锁指针 |状态码 10 (重量级锁)
    Note over t2 :执行临界区代码块
    obj ->> -t2 : 成功解锁
自旋重试失败的情况
sequenceDiagram
    participant t1 as 线程1(CPU 0)
    participant obj as synchronized(Object)
    participant t2 as 线程2(CPU 1)
    
    Note over obj :状态码 01 (无锁)
    t1 ->> +obj :访问同步块, 获取 Monitor
    t1 ->> obj :成功加锁
    Note over obj :重量级锁指针 |状态码 10 (重量级锁)
    t2 -x obj :❌ 访问同步块, 获取 Monitor
    t2 ->> t2 :自旋重试
    Note over t1 :执行临界区代码块
    t2 ->> t2 :自旋重试
    t2 ->> t2 :自旋重试
    t2 ->> t2 :自旋重试
    Note over t2 :阻塞
    obj ->> -t1 :成功解锁
    Note over obj :状态码 01 (无锁)

由以上两个流程我们可以看到,可以设置【线程自旋重试的次数】来控制该线程是否要阻塞。在 JDK 1.6 之后版本的次数时自适应的。在单 CPU 下的自旋重试没有意义

3.4.5 偏向锁

注意 ⚠️

个人理解,对于一个类的对象的偏向锁,是对于一个【类本身】的偏向,而不是对于【类的对象】的偏向,一个类只能有一个偏向锁。

轻量级锁在没有竞争时,每次【锁重入】(该线程执行的方法中再次锁住该对象)操作仍需要 cas 替换操作,这样是会使性能降低的。那么我们能不能减少 cas 操作呢?

所以引入了【偏向锁】对性能进行优化:在【第一次 cas】时会将【线程的 ID】写入对象的 Mark Word中。此后发现这个线程 ID 就是自己的,就表示没有竞争,就不需要再次 cas;以后只要不发生竞争,这个对象就归该线程所有。

相当于在临界资源上刻上进程的名字, 该【资源】归【此进程】所有

偏向状态

Mark Word 在不同对象类型下的结构如下:

image-20220902142940973
  1. Normal 普通类型标记(最后 3 bits : 0 0 1):
    • hashcode(25 bits) 用来区分对象的哈希码;
    • age(4 bits) GC 垃圾回收时的分代年龄;
    • biased_lock(1 bit) 代表是否是偏向锁;
    • 最后 2 bits 代表该对象的加锁状态。
  2. Biased 偏向锁标记(最后 3 bits : 1 0 1):
    • thread(23 bits) 用来存储获得偏向锁的(操作系统层面的)线程 ID
    • epoch(2 bits) 批量重偏向与批量撤销时用到;
    • age(4 bits) GC 垃圾回收时的分代年龄;
    • biased_lock(1 bit) 代表是否是偏向锁;
    • 最后 2 bits 代表该对象的加锁状态。
  3. Ligthweight Locked 轻量级锁标记(最后 2 bits : 0 0):ptr_to_lock_record(30 bits) 指向 LockRecord 锁记录对象的指针;最后 2 bits 代表该对象的加锁状态。
  4. Heavyweight Locked 重量级锁标记(最后 2 bits : 1 0):ptr_to_heavyweight_monitor(30 bits) 指向 Monitor 的指针;最后 2 bits 代表该对象的加锁状态。
  5. GC 垃圾回收标记(最后 2 bits : 1 1):如果该对象已经被 GC 垃圾回收了,则将最后 2 bits 置为 11

TIPS :

  • 如果开启了偏向锁(默认开启),在创建对象时,对象的 Mark Word 后三位应该是 1 0 1
  • 但是偏向锁默认是有延迟的,不会再程序一启动就生效,而是会在程序运行一段时间(几秒之后),才会对创建的对象设置为偏向状态。可以使用-XX:BiasedLockingStartupDelay=0 来禁用延迟;
  • 如果没有开启偏向锁,对象的 Mark Word 后三位应该是 0 0 1
撤销偏向

以下几种情况会使对象的偏向锁失效

  • 调用对象的 hashCode()方法时才会产生哈希码,本来用来存储偏向锁的【线程 ID】的空间就要用来存【哈希码】,所以偏向锁就失效了;
  • 【无竞争】的多个线程使用该对象(升级为【轻量级锁】),因为违背了偏向锁【只有一个线程使用该对象】的本意;
  • 调用了 wait() / notify() 方法(调用 wait() 方法会导致锁膨胀而使用【重量级锁】)
批量重偏向

如果对象虽然被多个线程访问,但是线程间不存在竞争,这时偏向 thread-1 的对象仍有机会重新偏向 thread-2。重偏向会重置 Thread ID

当对同一个加锁对象(而不是对象的实例)撤销重偏向超过超过阈值时,JVM 会觉得是不是偏向错了,此后再给对象加锁时,JVM 会将该对象重新偏向至该线程。

批量撤销

当撤销偏向锁的操作超过阈值以后,就会将整个类的对象都改为不可偏向的

【图解】

![截屏2022-09-02 19.29.43](2022-03-23-JUC-Java多线程-1/image-2022-09-02 19.29.43.png)

3.5 Wait() / Notify()

20200608144917
  • Owner 线程发现执行条件不满足,锁对象调用 wait()方法(obj.wait()),就会使当前线程进入 WaitSet 中,变为 WAITING / TIMEWAITING 状态。

  • 处于 BLOCKEDWAITING 状态的线程都为阻塞

    状态,CPU都不会分给他们时间片。但是有所区别:

    • *BLOCKED 状态的线程是在竞争锁对象时,发现 MonitorOwner 已经是别的线程了,此时就会进入 EntryList 中,并处于 BLOCKED 状态
    • *WAITING状态的线程是获得了对象的锁,但是自身因为某些原因需要进入阻塞状态时,锁对象调用了wait() 方法而进入了 WaitSet 中,处于 WAITING 状态
  • BLOCKED 状态的线程会在锁被释放的时候被唤醒,但是处于 WAITING 状态的线程只有被锁对象调用了notify()notifyAll()方法,才会被唤醒。

3.5.1 API介绍
  1. obj.wait():让进入 Monitor 的线程到 waitSet 等待
  2. obj.wait(long timeout) 让进入Monitor 的线程到 waitSet 等待 timeout时间长度,继续执行
  3. obj.notify():在 Monitor 上正在 waitSet 等待的线程中**【挑一个】**唤醒(虽然在 JVM 层面被描述成随机唤醒一个 WaitSet 中的线程,但是在源码实现方面,实际上仍然是唤醒第一个(先进先出))
  4. obj.notifyAll():在Monitor 上正在 waitSet 等待的线程**【全部】**唤醒

注意⚠️:他们都是线程之间进行协作的手段,都属于 Monitor对象的方法。必须获得此对象的锁,才能调用这几个方法。只有当对象被锁以后,才能调用 wait()notify() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static final Object lock = new Object ();
public static void main(){

new Thread (() -> {
synchronized (lock) {
try {
lock.wait(); // 把线程t1放在lock的WaitSet中,空出Monitor的Owner
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1").start();

new Thread (() -> {
synchronized (lock) {
try {
lock.wait(); // 把线程t2放在lock的WaitSet中,空出Monitor的Owner
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t2").start();

Thread.sleep(2000);
synchronized (lock) { // 主线程
lock.notify(); // 随机唤醒一个
lock.notifyAll(); // 唤醒所有
}
}
3.5.2 wait()sleep() 的区别
  • 不同点:

    1. 从 API 角度:sleep()Thread 的静态方法wait() 是所有对象(Object)的方法
    2. sleep() 【不用】与 synchronized 一起用,wait() 【需要】与 synchronized 一起用
    3. sleep() 不会释放锁wait() 在等待时会释放锁
  • 相同点:

    1. 进入的线程状态都是 TIMED_WAITING

为了解决 wait() 的虚假唤醒(因为 WaitSet 中有多个线程,我们无法精确指定唤醒哪个线程,所以我们使用 lock.notifyAll() 唤醒所有线程。而因为我们唤醒了所有 WaitSet 中的线程,其他线程应该被重新 wait()WaitSet 中,所以我们使用 while 重复执行 wait()),代码结构如下:while

1
2
3
4
5
6
7
8
9
10
11
synchronized (lock){
while (条件不成立) {
lock.wait();
}
// TODO
}

// 另一个线程
synchronized (lock){
lock.notifyAll();
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep4 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;

public static void main(String[] args) {


new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();

new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();

sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();
}
}

3.6 同步模式 之 保护性暂停

即 Guarded Suspension,用于一个线程等待另一个线程的执行结果时。如下图。

image-20220904170554996

要点:

  • 一个结果想从一个线程传递到另一个线程,可以让他们关联同一个 【GuardedObject
  • 如果有结果不断的从一个结果到另一个结果,那么可以使用**【消息队列】**(见消费者/生产者)
  • JDK 中,join()的实现、future 的实现,采用的就是该模式
  • 因为要等待另一方的结果,因此归类到同步模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Test2 {
public static void main(String[] args) {
String hello = "hello thread!";
Guarded guarded = new Guarded();

// thread 1
new Thread(()->{
System.out.println("想要得到结果");
synchronized (guarded) {
System.out.println("结果是:"+ guarded.getResponse());
}
System.out.println("得到结果");
}).start();

//thread 2
new Thread(()->{
System.out.println("设置结果");
synchronized (guarded) {
guarded.setResponse(hello);
}
}).start();
}
}

class Guarded {
/**
* 要返回的结果
*/
private Object response;

//优雅地使用wait/notify
public Object getResponse() {
//如果返回结果为空就一直等待,避免虚假唤醒
synchronized (this) {
while(response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return response;
}

public void setResponse(Object response) {
this.response = response;
synchronized (this) {
//唤醒休眠的线程
this.notifyAll();
}
}

@Override
public String toString() {
return "Guarded{" +
"response=" + response +
'}';
}
}

带超时判断的暂停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 修改上面代码块中的 getResponse() 方法
public Object getResponse(long time) {
synchronized (this) {
//获取开始时间
long currentTime = System.currentTimeMillis();
//用于保存已经等待了的时间
long passedTime = 0;
while(response == null) {
//看经过的时间-开始时间是否超过了指定时间
long waitTime = time -passedTime;
if(waitTime <= 0) {
break;
}
try {
//等待剩余时间
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获取当前时间
passedTime = System.currentTimeMillis()-currentTime
}
}
return response;
}
3.6.1 join() 的原理

join() 的原理:使用保护性暂停模式。一个线程等待另一个线程的结束。源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) { // 检查 等待时间 millis 是否符合要求
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) { // 如果该线程存活,则 wait(0) : 一直等下去
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
3.6.2 扩展 - Future 的原理

图中的 Future 就好比居民楼的信箱(每个信箱都有自己的编号),左边的 t0t2t4 就好比等待邮件的居民,右侧的 t1t3t5 就好比邮递员。

如果需要再多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅可以解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的处理。

image-20220327202425832
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class GuardedObject {
private int id;
public GuardedObject (int id){
this.id = id;
}
}

class Future {
private Map<Integer, GuardedObject> map = new HashTable<>();

private static int id = 0;
private static synchronized generateId() {
return id++;
}

public static GuardedObject createGuardedObject() {
GuardedObject obj = new GuardedObject(generateId());
map.put(obj.getId(), obj);
return obj;
}

public static Set<Integer> getId() {
return map.keySet();
}

public static GuardedObject getGuardedObject (int id) {
return map.remove(id);
}
}

public class Test {
public static void main(String[] args) {

Thread t1 = new Thread (() -> {
GuardedObject guardedObject = Future.createGuardedObject();
guardedObject.notifyAll();
}, "t1").start();

Thread t2 = new Thread(() -> {
while (Future.getGuardedObject(1) == null) {
guardedObject.wait();
}
GuardedObject guardedObject = Future.getGuardedObject(1);
}, "t2").start();
}
}

3.7 异步模式 之 生产者/消费者

3.7.1 要点
  • 与前面的保护性暂停中的 Guardobject 不同,不需要产生结果和消费结果的线程一一对应
  • 消息队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量 capacity 限制的,满时不会再加入数据(【生产者阻塞】),空时不会再消耗数据(【消费者阻塞】)
  • JDK 中各种阻塞队列,采用的就是这种模式

image-20220327204404520

如上图所示,左边的三个线程是生产者,右边的是消费者。

3.7.2 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 线程间通信的消息队列
class MessageQueue {
// 消息的队列集合,双向链表
private LinkedList<Message>list = new LinkedList();
// 队列容量
private int capacity;
// 构造函数
public MessageQueue (int capacity) {
this.capacity = capacity;
}

public Message take() { // 取消息
// 检查队列是否为空
synchronized(list){
while(list.isEmply()){
try {
System.out.println("消息队列为空,消费者等待\n");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头获取消息并返回
Message message = list.removeFrist();
System.out.println("已经消费一个消息\n" +message);
list.notifyAll(); // 目的是唤醒 WaitSet 中因为消息队列已满的【生产者】线程
return message;
}
}

public void put(Message message) { // 存消息
// 检查队列是否已满
synchronized(list){
while(list.size() == capcity) {
try {
System.out.println("消息队列已满,生产者等待\n");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
System.out.println("已经生产了一个消息:\n" +message);
list.notifyAll(); // 目的是唤醒 WaitSet 中因为消息队列已空的【消费者】线程
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final class Message { // 只能创建和读取,不能修改;且字类不能覆盖(final)
private int id;
private Object value;

public Message (int id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
return this.id;
}

public Object getValue() {
return this.value;
}

public String toString() {
return "Message: id:"+id+", value:"+value;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);

// 创建 3 个者线程
for (int i=1; i<3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "value"+id));
}, "生产者"+i).start();
}

// 创建 1 个消费者线程
new Thread(() -> {
while(true) {
Thread.sleep(1000);
Message massage = queue.take();
}
}, "消费者").start();
}

3.8 park() & unpark()

3.8.1 基本使用

park/unpark都是LockSupport类中的的方法

1
2
3
4
5
//暂停线程运行
LockSupport.park;

//恢复线程运行
LockSupport.unpark(Thread);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()-> {
System.out.println("park");
//暂停线程运行
LockSupport.park();
System.out.println("resume");
}, "t1");
thread.start();

Thread.sleep(1000);
System.out.println("unpark");
//恢复线程运行
LockSupport.unpark(thread);
}
3.8.2 特点

park()/unpark()wait()/notify() 的区别

  • wait()notify()notifyAll() 必须配合 ObjectMonitor 一起使用,而 park()unpark() 是与线程相关。
  • park()unpark()以线程为单位来**【阻塞】【唤醒】**线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park()/unpark() 可以unpark(),而 wait()/notify() 不能先 notify()
  • park() 不会释放锁,而 wait() 会释放锁
3.8.3 原理

每个线程都有一个自己的 Parker 对象,并且该对象由 _counter_cond_mutex 等组成。

我们研究以下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Parker : public os::PlatformParker {
// 我们可以看到:它继承了os::PlatformParker,内置了一个 volatitle的 _counter。
// os::PlatformParker 的结构我们在下面介绍。
private:
// 表示许可
volatile int _counter ; // 计数器,是park用来判断是否暂停线程的核心依据,存在读写屏障
Parker * FreeNext ;
JavaThread * AssociatedWith ; // Current association 与当前Parker对象相关的Java线程
public:
Parker() : PlatformParker() {
//初始化_counter
_counter = 0 ;
FreeNext = NULL ;
AssociatedWith = NULL ;
}
protected:
~Parker() { ShouldNotReachHere(); }
public:
void park(bool isAbsolute, jlong time); // park 方法
void unpark(); // unpark 方法

// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;

};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class PlatformParker : public CHeapObj {
protected:
pthread_mutex_t _mutex [1] ; // 互斥变量类型
pthread_cond_t _cond [1] ; // 条件变量类型

public:
~PlatformParker() { guarantee (0, "invariant") ; }

public:
PlatformParker() {
int status;
// 初始化条件变量,使用 pthread_cond_t 之前必须先执行初始化
status = pthread_cond_init (_cond, NULL);
assert_status(status == 0, status, "cond_init”);
// 初始化互斥变量,使用 pthread_mutex_t 之前必须先执行初始化
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
}
}

此时我们再观察 park() 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
void Parker::park(bool isAbsolute, jlong time) {
if (_counter > 0) {
// 已经有许可了,用掉当前许可
_counter = 0 ;
// 使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到 0
OrderAccess::fence();
//立即返回
return ;
}

Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;

if (Thread::is_interrupted(thread, false)) {
// 线程执行了中断,返回
return;
}

if (time < 0 || (isAbsolute && time == 0) ) {
// 时间到了,或者是代表绝对时间,同时绝对时间是0(此时也是时间到了),直接返回,java中的parkUtil传的就是绝对时间,其它都不是
return;
}
if (time > 0) {
// 传入了时间参数,将其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(纳秒)存储起来,存的是绝对时间
unpackTime(&absTime, isAbsolute, time);
}

// 进入safepoint region,更改线程为阻塞状态.
ThreadBlockInVM tbivm(jt);

if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
// 如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回
return;
}

// 这里表示线程互斥变量锁成功了
int status ;
if (_counter > 0) {
// 有许可了,返回
_counter = 0;
//对互斥变量解锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
OrderAccess::fence();
return;
}

#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
// debug用
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

//将java线程所拥有的操作系统线程设置成 CONDVAR_WAIT状态 ,表示在等待某个条件的发生
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
//将java的_suspend_equivalent参数设置为true
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
if (time == 0) {
//把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。
//成功返回0,否则返回错误编号
status = pthread_cond_wait (_cond, _mutex) ;
} else {
//同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUT
status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
//WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1
//去除初始化
pthread_cond_destroy (_cond) ;
//重新初始化
pthread_cond_init (_cond, NULL);
}
}
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");

#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
//等待结束后,许可被消耗,改为0 _counter = 0 ;
//释放互斥量的锁
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
//加入内存屏障指令
OrderAccess::fence();
}

park 的实现可以看到

  1. 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
  2. 线程的等待与挂起、唤醒等等就是使用的 POSIX 的线程 API
  3. park 的许可通过变量 _count 实现,当被消耗时,_count 为 0,只要拥有许可,就会立即返回

unpark() 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void Parker::unpark() {
int s, status ;
//给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
//存储旧的_counter
s = _counter;
//许可改为1,每次调用都设置成发放许可
_counter = 1;
if (s < 1) {
//之前没有许可
if (WorkAroundNPTLTimedWaitHang) {
//默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
//释放锁
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
//一直有许可,释放掉自己加的锁,有许可park本身就返回了
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}

从源码可知 unpark 本身就是发放许可 _count,并通知等待的线程,已经可以结束等待了。

3.8.4 park / unpark 总结
  • 先调用 park 再调用 unpark

    1. 先调用 park
      • 线程运行时,会将Park对象中 _counter的值设为 0;
      • 调用 park 时,会先查看 _counter的值是否为 0,如果为 0,则将线程放入阻塞队列 _cond
      • 放入阻塞队列中后,会再次_counter设置为 0
    20200608145250
    1. 然后调用 unpark
      • 调用 unpark 方法后,会将_counter的值设置为 1
      • 去唤醒阻塞队列_cond中的线程
      • 线程继续运行并将_counter的值设为 0
    20200608145303
  • 先调用 unpark,再调用 park

    1. 调用 unpark

      • 会将_counter设置为 1(运行时 0)
    2. 调用 park 方法

      • 查看_counter是否为 0
      • 因为 unpark 已经把_counter设置为 1,所以此时将_counter设置为 0,但不放入阻塞队列 _cond
20200608145313

3.9 线程状态转换

20200608144621
情况一:t.start() : NEW --> RUNNABLE
  • 当调用了 Thread t.start() 方法时,Java 中的 Thread 对象就和操作系统的线程关联起来了。
  • NEW –> RUNNABLE
情况二:wait/notify : RUNNABLE <--> WAITING
  • 当调用了 t 线程用 synchronized(obj) 获取了对象锁后
    • 调用 obj.wait() 方法时, t 线程从 RUNNABLE –> WAITING
    • 调用 obj.notify()obj.notifyAll()t.interrupt()
      • 竞争 Monitor 锁【成功】,t 线程从 WAITING –> RUNNABLE
      • 竞争 Monitor 锁【失败】,t 线程从 WAITING –> BLOCKED
情况三:t.join() : RUNNABLE <--> WAITING
  • 当前线程

    调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING

    • 注意是当前线程t 线程对象的监视器上等待
  • t 线程运行结束,或调用了当前线程interrupt() 时,当前线程从 WAITING –> RUNNABLE

情况四:park/unpark : RUNNABLE <--> WAITING
  • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE
情况五: wait/notify (long n) : RUNNABLE <--> TIMED_WAITING

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING
  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify()obj.notifyAll()t.interrupt()
    • 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
    • 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED
情况六:t.join(long n) : RUNNABLE <--> TIMED_WAITING
  • 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING
    • 注意是当前线程在t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE
情况七:Thread.sleep(long n) : RUNNABLE <--> TIMED_WAITING
  • 当前线程调用 Thread.sleep(long n),当前线程从 RUNNABLE –> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE
情况八:parkNanos/unpark (long n) : RUNNABLE <--> TIMED_WAITING
  • 当前线程调用 LockSupport.parkNanos(long nanos)LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING –> RUNNABLE
情况九:阻塞在EntryList : RUNNABLE <--> BLOCKED
  • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED
  • obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中t 线程竞争成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED
情况十: 执行完毕 : RUNNABLE --> TERMINATED

当前线程所有代码运行完毕,进入 TERMINATED

3.10 多把锁

将锁的粒度细分

例如一套房子中有多个房间,书房和卧室的功能毫不相干。所以我们可以把锁分别加在卧室和书房上。

1
2
3
4
5
class BigRoom {
//额外创建对象来作为锁
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
}

3.11 线程的活跃性

3.11.1 死锁

定义:

在并发环境下,各进程因竞争资源而造成的一种互相等待对方手里的资源,导致各进程都阻塞,都无法向前推进的现象,就是死锁。

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁

如:

  • t1线程 获得 A对象锁,接下来想获取 B对象的锁
  • t2线程 获得 B对象锁,接下来想获取 A对象的锁

每个人都占有一个资源,同时又在等待另一个人手里的资源。发生“死锁”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
final Object A = new Object();
final Object B = new Object();
new Thread(()->{
synchronized (A) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B) {
System.out.println("TODO");
}
}
}).start();

new Thread(()->{
synchronized (B) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (A) {
System.out.println("TODO");
}
}
}).start();
}
发生死锁的必要条件
  1. 互斥条件:只有对必须互斥使用的资源的争抢才会产生死锁。

  2. 不可剥夺条件:进程所获得的资源在未使用完之前,不能由其他进程强行夺走

  3. 请求和保持条件:进程已经保持了至少一个资源,但又提出了新的资源请求,而该资源又被其他进程占有,此时请求进程被阻塞,但又对自己已有的资源保持不放

  4. 循环等待条件:存在一种进程资源的循环等待链,链中的每一个进程已获得的资源同时被下一个进程所请求。死锁 \to 一定有循环等待; 循环等待【不一定】发生死锁

【预防死锁】: 破坏死锁产生的必要条件,即可预防死锁。

避免死锁的方法

在线程使用锁对象时**,顺序加锁**即可避免死锁。线程1按顺序获得A,B对象的锁:

sequenceDiagram
    participant t1 as 线程1
    participant t2 as 线程2
    participant obj1 as 对象1
    participant obj2 as 对象2
    
    t1 -->> obj1 : 尝试获取锁
    Note over t1,obj1 :拥有锁
    
    t2 --x obj1 : 尝试获取锁(失败)
    t2 -->> obj1 : 阻塞
    
    t1 -->> obj2 : 尝试获取锁
    Note over t1,obj2 :拥有锁
3.11.2 活锁

活锁出现在两个线程互相改变对方的结束条件后谁也无法结束。

避免活锁的方法

在线程执行时,中途给予不同的间隔时间即可。

死锁与活锁的区别
  • 死锁是因为线程互相持有对象想要的锁,并且都不释放,最后到时线程阻塞停止运行的现象。
  • 活锁是因为线程间修改了对方的结束条件,而导致代码一直在运行,却一直运行不完的现象。
3.11.3 饥饿

某些线程因为优先级太低,导致一直无法获得资源的现象。

为了避免死锁而使用顺序加锁时,可能会出现饥饿现象

3.11.4 死锁、饥饿和死循环的异同点
共同点 区别
死锁 都是进程无法顺利向前推进的现象 死锁一定是“循环等待对方手里的资源”导致的,因此如果有死锁现象,那至少有两个或两个以上的进程同时发生死锁。另外,发生死锁的进程一定处于阻塞态
饥饿 可能只有一个进程发生饥饿。发生饥饿的进程可能是阻塞态(如长期得不到需要的IO设备),也可能是就绪态(长期得不到处理机)
死循环 可能只有一个进程发生死循环。死循环可以是运行态,只不过无法继续推进。死锁和饥饿是由于操作系统分配资源的策略不合理导致的,而死循环是由代码逻辑错误导致的。死锁和饥饿是管理者(操作系统)的问题,死循环是被管理者的问题。
3.11.5 预防死锁 - 静态策略
3.11.5.1 资源分配图(Graphe d’allocation)
  • 两种结点
    • 进程结点:对应一个进程
    • 资源结点:对应一类资源,一类资源可能有多个。(一般用矩形代表资源结点,矩形中的小圆圈代表该类资源的数量)
  • 两种边:
    • 进程结点 -> 资源结点:请求边,表示进程想申请几个资源(一条边代表一个)
    • 资源结点 -> 进程结点:分配边,表示已经为进程分配了几个资源
graph LR
  A(P1) --> B
  B[R2:2] --> C
  C(P2) --> D
  D[R1:3] --> C
  D --> A
  D --> A

如果系统中剩余的可用资源数足够满足进程的需求,那么这个进程暂时是不会阻塞的,可以顺利地执行下去。

graph LR;
  A(P1) 
  B[R2:2] --> C
  C(P2) --> D
  D[R1:3] --> C

如果这个进程执行结束了把资源归还系统,就可能使某些正在等待资源的进程被激活,并顺利地执行下去。
相应的,这些被激活的进程执行完了之后又会归还一些资源,这样可能又会激活另外一些阻塞的进程….

graph TB;
  A(P1) 
  B[R2:2]
  C(P2)
  D[R1:3]

如果按上述过程分析,最终能消除所有边,就称这个图是可完全简化的。此时一定没有发生死锁(相当于能找到一个安全序列)

另一个例子:

graph LR;
  A(P1)
  B[R2:2]
  C(P2)
  D[R1:3]
  
  E(P3)
  E --> B
  A --> B
  A --> B
  B --> C
  C --> D
  D --> C
  D --> A
  D --> A

如果最终不能消除所有边,那么此时就是发生了死锁最终没有被消除的边所连的进程就发生了死锁

3.11.5.2 死锁的解除

用死锁检测算法化简资源分配图后,还连着边的那此进程就是死锁进程。 解除死锁的主要方法有:

  1. 资源剥夺法。挂起(暂时放到外存上)某些死锁进程,并抢占它的资源,将这些资源分配给其他的死锁进程。但是应防止被挂起的进程长时间得不到资源而饥饿。
  2. 撤销进程法(或称终止进程法)。强制撤销部分、甚至全部死锁进程,并剥夺这些进程的资源,这种方式的优点是实现简单,但所付出的代价可能会很大。因为有些进程可能已经运行了很长时间,己经接近结束了,旦被终止可谓功亏一篑,以后还得从头再来。
  3. 进程回退法。让一个或多个死锁进程回退到足以避免死锁的地步。这就要求系统耍记录进程的历史信息,设置还原点
3.11.6 避免死锁 - 动态策略
3.11.6.1 安全序列

所谓安全序列,就是值如果系统按照这种序列分配资源,则每个进程都能顺利完成。只要能找出一个安全序列,系统就是安全状态。当然,安全序列可能有多个
如果分配了资源之后,系统中找不出任何一个安全序列,系统就进入了不安全状态。这就意味着之后可能所有进程都无法顺利的进行下去。当然,如果有进程提前归还了一些资源,那系统也有可能重新回到安全状态,不过我们在分配资源之前总是要考虑到最坏的情况。

因此可以在资源分配之前前预判这次分配是否会导致系统进入不安全状态,一次决定是否答应资源分配的请求。这也是“银行家算法”的核心思想。

3.11.6.2 银行家算法

假设系统中有 n 个进程,m 种资源每个进程在运行前先声明对各种资源的最大需求数,则可用一个 n*m的矩阵(可用二维数组实现) 表示所有进程对各种资源的最大需求数。不妨称为最大需求矩阵 MaxMax[i,j] = K 表示进程 P_i 最多需要 K 个资源 R_j。同理,系统可以用一个 n*m分配矩阵 Allocation 表示对所有进程的资源分配情况。Max - Allocation = Need 矩阵,表示各进程最多还需要多少各类资源。
另外,还要用一个长度为 m 的一维数组 Available(...) 表示当前系统中还有多少可用资源
某进程 PiP_i 向系统申请资源,可用1个长度为m的一维数组 Request(...), 表示本次申请的各种资源量

进程 最大需求(Max 矩阵) 已分配(Allocation 矩阵) 最多还需要(Need 矩阵)
P0 (7,5,3) (0,1,0) (7,4,3)
P1 (3,2,2) (2,0,0) (1,2,2)
P2 (9,0,2) (3,0,2) (6,0,0)
P3 (2,2,2) (2,1,1) (0,1,1)
P4 (4,3,3) (0,0,2) (4,3,1)

可用银行家算法预判本次分配是否会导致系统进入不安全状态:

  1. 如果 Request_i[j] < Need[i,j] (0 <= j < m) 便转向 ②:否则认为出错
  2. 如果 Request_i[j] < Available[i,j] (0 <= j < m),便转向 ③:否则表示尚无足够资源,p必须等待
  3. 系统试探着把资源分配给进程 PiP_i,并修改相应的数据(并非真的分配,修改数值只是为了做预判):
    • Available = Available - Request_j;
    • Allocation[i,j] = Allocation[i,j] + Request_i[j];
    • Need[i,j] = Need[i,j] - Request_i[j]
  4. 操作系统执行安全性算法,检查此次资源分配后,系统是否处于安全状态。若安全,才正式分配;否则,恢复相应数据,让进程阻塞等待。

【银行家算法步骤】

  1. 检查此次申请是否超过了之前声明的最大需求数
  2. 检查此时系统剩余的可用资源是否还能满足这次请求
  3. 试探着分配,更改各数据结构
  4. 用安全性算法检查此次分配是否会导致系统进入不安全状态

安全性算法步骤: 检查当前的剩余可用资源是否能满足某个进程的最大需求,如果可以,就把该进程加入安全序列,并把该讲程持有的资源全部回收。 不断重复上述过程,看最终是否能让所有进程都加入安全序列。

3.12 ReentrantLock (可重入锁)

源码分析:Java中的锁:ReentrantLock

synchronized 相比具有的的特点

  • 可被别的线程中断(例如 A,B 两个线程中,A 线程拥有锁,B 线程中断 A 线程后,锁就被释放了)
  • 可以设置超时时间(给定时间内如果争抢不到锁,就放弃争抢)
  • 可以设置为公平锁 (先到先得)
  • 支持多个条件变量(相当于具有多个 WaitSet
3.12.1 基本语法
1
2
3
4
5
6
7
8
9
10
11
12
// 获取 ReentrantLock 对象
private ReentrantLock lock = new ReentrantLock();
// 加锁。思考为什么lock()要在try代码块的外面?
// 因为如果在try块内,线程在lock()的时候出现异常,会执行finally代码块中的unlock()方法,
// 但是此时并没有成功lock(),程序会出错。
lock.lock();
try {
// 需要执行的代码
} finally {
// 释放锁
lock.unlock();
}
3.12.2 可重入
  • 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
  • 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock(); // 锁
try {
log.debug("execute method1");
method2(); // 调用method2
}
finally {
lock.unlock();
}
}
public static void method2() {
lock.lock(); // 锁的重入
try {
log.debug("execute method2");
method3(); // 调用method3
}
finally {
lock.unlock();
}
}
public static void method3() {
lock.lock(); // 锁的重入
try {
log.debug("execute method3");
}
finally {
lock.unlock();
}
}

输出:

1
2
3
17:59:11.862 [main] c.TestReentrant - execute method1
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3
3.12.3 可打断

可打断的意义:被动避免死等

如果某个线程 ReentrentLock lock.lockInterruptibly() 处于阻塞状态(等待获得锁),可以调用其 interrupt() 方法让其停止阻塞,放弃获得锁。

简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ReentrantLock lock = new ReentrantLock(); 

Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
// 如果没有竞争,那么此方法就会获得lock对象锁
// 如果有竞争就进入阻塞队列,可以被其他进程用 interrupt 方法打断
lock.lockInterruptibly(); // 可中断锁
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try { // 思考这里为什么要用两个try代码块?可以写在一起吗?
log.debug("获得了锁");
}
finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt(); // 执行中断
log.debug("执行打断");
}
finally {
lock.unlock();
}

输出:

1
2
3
4
5
6
7
8
9
18:02:40.520 [main] c.TestInterrupt - 获得了锁 
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr onizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断

注意如果是不可中断模式,那么即使被 interrupt() 也不会让等待中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
log.debug("启动...");
lock.lock(); // 普通锁
try {
log.debug("获得了锁");
}
finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt(); // 执行中断
log.debug("执行打断");
sleep(1);
}
finally {
log.debug("释放了锁");
lock.unlock();
}

输出:

1
2
3
4
5
18:06:56.261 [main] c.TestInterrupt - 获得了锁
18:06:56.265 [t1] c.TestInterrupt - 启动...
18:06:57.266 [main] c.TestInterrupt - 执行打断 // 这时 t1 并没有被真正打断, 而是仍继续等待锁
18:06:58.267 [main] c.TestInterrupt - 释放了锁
18:06:58.267 [t1] c.TestInterrupt - 获得了锁
3.12.4 锁超时

锁超时的意义:主动避免死等

使用 lock.tryLock() 方法会返回获取锁是否成功。如果成功则返回 true,反之则返回 false

并且 tryLock() 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit),其中 timeout 为最长等待时间,TimeUnit 为时间单位

简而言之就是:获取失败了、获取超时了或者被打断了,不再阻塞,直接返回。

例:不设置等待时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ReentrantLock lock = new ReentrantLock(); 

Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
}
finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
}
finally {
lock.unlock();
}

输出:

1
2
3
18:15:02.918 [main] c.TestTimeout - 获得了锁
18:15:02.921 [t1] c.TestTimeout - 启动...
18:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回

例:设置等待时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(()-> {
try {
//判断获取锁是否成功,最多等待1秒
if(!lock.tryLock(1, TimeUnit.SECONDS)) {
System.out.println("获取失败");
//获取失败,不再向下执行,直接返回
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
//被打断,不再向下执行,直接返回
return;
}
System.out.println("得到了锁");
try {
// TODO : 临界区代码
} finally {
lock.unlock();
}
});


lock.lock();
try{
t1.start();
//打断等待
t1.interrupt();
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
3.12.5 公平锁

在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。但会降低并发度。

1
2
// 默认是不公平锁,需要在创建时指定为公平锁
ReentrantLock lock = new ReentrantLock(boolean fair);
3.12.6 条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

使用要点:

  • await() 前需要获得锁
  • await() 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static ReentrantLock lock = new ReentrantLock();
// 创建新的条件变量(休息室)
static Condition condition1 = lock.newCondition();
static Condition condition2 = lock.newCondition();

public static void main (String[] args) {
lock.lock();

condition1.await(); // 进入“休息室”等待

condition1.signal(); // 从“休息室”唤醒

condition1.signalAll();// 从“休息室”唤醒所有
}

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep4 {
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
static ReentrantLock ROOM = new ReentrantLock();
static Condition waitCigaretteSet = new Condition();
static Condition waitTakeOutSet = new Condition();

public static void main(String[] args) {

new Thread(() -> {
ROOM.lock();
try{
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
waitCigaretteSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小南").start();

new Thread(() -> {
ROOM.lock();
try {
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
waitTakeOutSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("可以开始干活了");
} finally {
ROOM.unlock();
}
}, "小女").start();

Thread.sleep(1000);
new Thread(() -> {
ROOM.lock();
try {
hasTakeout = true;
log.debug("外卖到了噢!");
waitTakeOutSet.signal();
} finally {
ROOM.unlock();
}
}, "送外卖的").start();

Thread.sleep(1000);
new Thread(() -> {
ROOM.lock();
try {
hasTakeout = true;
log.debug("烟到了噢!");
waitCigaretteSet.signal();
} finally {
ROOM.unlock();
}
}, "送烟的").start();
}
}

运行结果:

1
2
3
4
5
6
15:34:20.249 c.Test24	[小女] - 外卖送到没? [false]
15:34:20.260 c.Test24 [小女] - 没外卖,先歇会!
15:34:20.303 c.Test24 [小南] - 有烟没?[false]
15:34:20.303 c.Test24 [小南] - 没烟,先歇会!
15:34:21.238 c.Test24 [小女] - 可以开始干活了
15:34:22.241 c.Test24 [小南] - 可以开始干活了

3.13 同步模式 之 顺序控制

3.13.1 固定运行顺序

比如先打印“2”,再打印“1”

wait / notify 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final Object LOCK = new Object();
//判断先执行的内容是否执行完毕
static Boolean t2Runned = false;
public static void main(String[] args) {

new Thread(()->{
synchronized (LOCK) {
while (!t2Runned) { // 判断 2 是否打印过
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("1");
}
}, "t1").start();

new Thread(()->{
synchronized (LOCK) {
System.out.println("2");
t2Runned = true;
LOCK.notify(); //执行完毕,唤醒所有等待线程
}
}, "t2").start();
}
park / unpark方法
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main (String[] args) {

Thread t1 = new Thread (() -> {
LockSupport.park();
System.out.println("1");
}, "t1").start();

Thread t2 = new Thread (() -> {
System.out.println("2");
LockSupport.unpark(t1);
}, "t2").start();

}
3.13.2 交替输出

线程 1 输出“a“ 5 次,线程 2 输出 ”b“ 5 次,线程 3 输出 ”c“ 5 次。现在要求输出 abcabcabcabcabc 怎么实现?

wait / notify 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* 
输出内容 等待标记 下一个标记
a 1 2
b 2 3
c 3 1
*/
class WaitSymbol {
private int flag; // 等待标记
private int loopNum; // 循环次数

public WaitSymbol (int flag, int loopNum) {
this.flag = flag;
this.loopNum = loopNum;
}

public void print(String str, int waitFlag, int nextFlag){
for (int i=0; i<loopNum; i++) {
synchronized(this) {
while(flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
WaitSymbol ws = new WaitSymbol(1,5);

Thread t1 = new Thread (() -> {
ws.print("a", 1, 2);
}, "t1").start();

Thread t2 = new Thread (() -> {
ws.print("b", 2, 3);
}, "t2").start();

Thread t3 = new Thread (() -> {
ws.print("c", 3, 1);
}, "t3").start();
}

输出结果:

1
abcabcabcabcabc
ReentrantLock 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class AwaitSignal extends ReentrantLock{
private int loopNum = 5;

public AwaitSignal (int loopNum) {
this.loopNum = loopNum;
}

// 参数1:打印内容 参数2:进入哪一间休息室 参数3:下一间休息室
public void print(String str, Condition current, Condition next) {
for(int i=0; i<loopNumber; i++) {
lock();
try {
current.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static AwaitSignal awaitSignal = new AwaitSignal();
static Condition conditionA = awaitSignal.newCondition();
static Condition conditionB = awaitSignal.newCondition();
static Condition conditionC = awaitSignal.newCondition();

public static void main(String[] args) {

Thread t1 = new Thread(() -> {
awaitSignal.print("a", conditionA, conditionB);
}, "t1").start();

Thread t2 = new Thread(() -> {
awaitSignal.print("b", conditionB, conditionC);
}, "t2").start();

Thread t3 = new Thread(() -> {
awaitSignal.print("c", conditionC, conditionA);
}, "t3").start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
awaitSignal.lock();
try {
System.out.print("开始...");
conditionA.signal(); // 唤醒一个等待的线程
} finally {
awaitSignal.unlock();
}
}

输出结果:

1
abcabcabcabcabc
park / unpark 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ParkUnpark {
private loopNum;

public ParkUnpark(int loopNum) {
this.loopNum = loopNum;
}

public void print(String str, thread next) {
for (int i=0; i<loopNum; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(next);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> {
pu.print("a", t2);
}, "t1").start();

t2 = new Thread(() -> {
pu.print("b", t3);
}, "t2").start();

t3 = new Thread(() -> {
pu.run("c", t1);
}, "t3").start();

LockSupport.unpark(t1);
}

输出结果:

1
abcabcabcabcabc

3.14 经典问题:哲学家就餐问题

image-20220327221126928

圆桌上坐着5名哲学家,每两个哲学家之问的桌上摆一根筷子,桌子的中间是一碗米饭。哲学们倾注毕生的精力用于思考和进餐,哲学家在思考时,并不影响他人。只有当哲学家饥饿时才试图拿起左、右两根筷子(一根一根地拿起)。如果筷子已在他人手上,则需等待。饥饿的哲学家只有同时拿起两根筷子才可以开始进餐。当进餐完毕后,放下筷子继续思考。

  1. 关系分析。系统中有5个哲学家进程,5位哲学家与左右邻居对其中间筷子的访问互斥关系
  2. 整理思路。这个问题中只有互斥关系,但与之前遇到的问题不同的事,每个哲学家进程需要同时持有两个临界资源才能开始吃饭。如何避免临界资源分配不当造成的死锁现象,是哲学家问题的精髓。
  3. 信号量设置。定义互斥信号量数组chopstick [5] = {1,1,1,1,1}用于实现对5个筷子的互斥访问。并对哲学家按0~4编号,哲学家i左边的筷子编号为i,右边的筷子编号为(i+1) % 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Semaphore chopstick[5] = new Semaphore(1);
Semaphore mutex = new Semaphore(1);
Philosoph_i () { // i号哲学家进程
while(true) {
chopstick[i].down(); // 拿起左边筷子
chopstick[(i+1)%5].down(); // 拿起右边筷子
吃饭;
chopstick[i].up(); // 放下左边筷子
chopstick[(i+1)%5].up(); // 放下右边筷子
思考;
}
/*
这种情况下,所有哲学家都会拿起左边的筷子,会造成死锁
*/
}

**注意:**这种情况下,所有哲学家都会拿起左边的筷子,会造成 死锁。这种解决方案不合理。

如何防止死锁的发生呢?

  1. 可以对哲学家进程施加一些限制条件,比如最多允许四个哲学家同时进餐。这样可以保证至少有一个哲学家是可以拿到左右两只筷子
  2. 要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反。用这种方法可以保证如果相邻的两个奇偶号哲学家都想吃饭,那么只会有其中一个可以拿起第一只筷子,另一个会直接阻塞。这就避免了占有一支后再等待另一只的情况。
  3. 当且仅当一个哲学家左右两只筷子都可用时才允许他挂起筷子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Semaphore chopstick[5] = new Semaphore(1);
Semaphore mutex = new Semaphore(1); // 互斥地取筷子
/*
实现方法3.
*/
Philosoph_i () { // i号哲学家进程
while(true) {
mutex.down(); // 拿两个筷子的锁
chopstick[i].down(); // 拿起左边筷子
chopstick[(i+1)%5].down(); // 拿起右边筷子
mutex.up();
吃饭;
chopstick[i].up(); // 放下左边筷子
chopstick[(i+1)%5].up(); // 放下右边筷子
思考;
}
}

总结:

  • 这些进程之间只存在互斥关系,但是与之前接触到的互斥关系不同的是,每个进程都需要同时持有两个临界资源,因此就有“死锁”问题的隐患。
  • 如果在考试中遇到了一个进程需要同时持有多个临界资源的情况,应该参考哲学家问题的思想,分析题中给出的进程之间是否会发生循环等待,是否会发生死锁
  • 可以参考哲学家就餐问题解决死锁的三种思路

4 共享模型之 内存

上一章讲解的 Monitor 主要关注的是访问共享变量时,保证临界区代码的原子性。这一章我们将进一步学习共享变量在多线程间的【可见性】问题与多条指令执行时的【有序性】问题。

本章节单独作为一篇:Java 内存模型 - JMM,点击跳转。

5 共享模型之 无锁

在本章中我们会研究一种相对于 Monitor(悲观锁)来实现线程安全的模式(乐观锁/无锁)。

本章节单独作为一篇:Java中的锁:乐观锁,点击跳转。

6 并发工具

在本章中我们会研究一些 Java 提供的并发工具,主要分为以下三个部分:

  1. 线程池使用及分析
  2. java.util.concurrent 并发工具包的使用及源码分析
  3. 第三方并发工具