我们可以思考如下场景:线程是一个系统资源,操作系统每分配一个线程,就会消耗一定的资源。如果在高并发的环境下, 我们为每个任务都创建一个线程的话,那么对资源的消耗时非常大的。为了减小在这中环境下的系统消耗,我们引入线程池概念来维护多个线程,将任务分配给线程池中的线程。

什么是线程池?
线程池其实就是一种多线程处理的形式,处理过程中可以将任务添加到到队列中,然后在创建线程后自动启动这些任务。

使用线程池的优势:

  • 使用线程池可以统一的管理线程和控制线程并发数量;
  • 可以与任务分离,提升线程重用度;
  • 提升系统的响应速度

我们主要通过一下几个部分来介绍 Java 中的线程池:

  1. 自定义线程池
  2. JDK 提供的线程池:ThreadPoolExecutor
  3. 设计模式 - 工作线程

一、自定义线程池

我们在介绍 JDK 提供的线程池之前,先自定义一个线程池,以更好的了解线程池的工作方式。

我们可以回忆一下生产者消费者问题,线程池其实就符合这个模型。线程池中的线程 thread 就是消费者,阻塞队列中的任务 task 就是商品,产生任务的线程 main 就是生产者。如下图

image-20220926124624697

下面我们分别实现上述部分。

1 阻塞队列 BlockingQueue

在这种“生产者消费者”模型下,生产与消费 task 的速率很可能是不一致的,生产的太多而来不及消费。此时我们就需要阻塞队列 BlockingQueue 来储存这些“产品”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public class MyBlockingQueue<T> {
// 1.任务队列(双向链表)
private Deque<T> queue = new ArrayDeque<>();

// 2.任务队列的容量
private int capacity;

// 3.锁:多个线程都在队列头部竞争task,所以需要锁保证互斥性
private ReentrantLock lock = new ReentrantLock();

// 4.生产者条件变量:当队列中的task数量达到容量时,生产者阻塞等待
private Condition fullWaitSet = lock.newCondition();

// 5.消费者条件变量:当队列中没有task时,消费者阻塞等待
private Condition emptyWaitSet = lock.newCondition();

// 构造方法
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
}

// 阻塞获取
public T take() {
lock.lock(); // 加锁
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await(); // 当任务队列为空时,消费者阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst(); // 任务队列不为空时,取得队列头元素,并将其从队列中移除
System.out.println("从队列中取出一个任务" + t);
System.out.println("队列中的元素有:\t" + queue.toString());
fullWaitSet.signal(); // 消费了一个元素,唤醒阻塞的生产者
return t;
} finally{
lock.unlock(); // 解锁
}
}

// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock(); // 加锁
try {
long nanos = unit.toNanos(timeout); // 将timeout统一转换成纳秒
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
// 更新剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos); // 任务队列为空时消费者阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst(); // 任务队列不为空时,取得队列头元素,并将其从队列中移除
fullWaitSet.signal(); // 消费了一个元素,唤醒阻塞的生产者
return t;
} finally{
lock.unlock(); // 解锁
}
}

// 阻塞添加
public void put (T element){
lock.lock(); // 加锁
try {
while (queue.size() == capacity) {
try {
System.out.println(element + " 等待加入任务队列...");
fullWaitSet.await(); // 当任务队列已满时,生产者阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element); // 任务队列为满时,将element添加到队尾
System.out.println("队列中的元素有:\t" + queue.toString());
emptyWaitSet.signal(); // 生产了一个元素,唤醒阻塞的消费者
} finally {
lock.unlock(); // 解锁
}
}

// 带超时的阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock(); // 加锁
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
System.out.println(task + " 等待加入任务队列...");
if (nanos <= 0) {
System.out.println(task + " 等待时间超时,放弃!");
return false;
}
// 更新剩余等待时间
nanos = fullWaitSet.awaitNanos(nanos); // 任务队列已满时,生产者阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task); // 任务队列为满时,将element添加到队尾
System.out.println("队列中的元素有:\t" + queue.toString());
emptyWaitSet.signal(); // 生产了一个元素,唤醒阻塞的消费者
return true;
} finally {
lock.unlock(); // 解锁
}
}

// 获取大小
public int size() {
lock.lock(); // 加锁
try {
return queue.size();
} finally {
lock.unlock(); // 解锁
}
}

public void tryPut(MyThreadPool.RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否已满
if (queue.size() == capacity) {
// 队列满时执行的策略,将权利下放
rejectPolicy.reject(this, task);
} else {
queue.addLast(task); // 任务队列为满时,将element添加到队尾
System.out.println("队列中的元素有:\t" + queue.toString());
emptyWaitSet.signal(); // 生产了一个元素,唤醒阻塞的消费者
}
} finally {
lock.unlock();
}
}
}

2 线程池 ThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class MyThreadPool {
// 任务队列
private MyBlockingQueue<Runnable> taskQueue;

// 线程集合
private HashSet<MyThread> threads = new HashSet<>();

// 核心线程数
private int coreNum;

// 获取任务的超时时间与时间单位
private long timeout;
private TimeUnit timeUnit;

// 构造方法
public MyThreadPool (int coreNum, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.coreNum = coreNum;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new MyBlockingQueue<>(queueCapacity);
}

// 执行任务
public void excute(Runnable task) {
// 如果任务数没有超过coreNum时,可以直接交给MyThread对象执行;
if (threads.size() < coreNum) {
Mythread t = new MyThread(task);
threads.add(t);
t.start();
} else {
// 如果超过了coreNum,则加入任务队列taskQueue
// 这里列出一些加入任务队列的策略:
taskQueue.put(task);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MyThread extends Thread {
private Runnable task;

public MyThread(Runnable task) {
this.task = task;
}

// 无超时时间版本 take
@Override
public void run() {
// 我们思考一下,这里的MyThread对象需要执行哪些任务?
// 1. MyThread初始化时传入的task对象
// 2. 不断地从任务对列taskQueue中获取任(take)
while(task != null || (task = taskQueue.take()) != null ) {
try {
System.out.println(Thread.currentThread() + " 正在执行任务:"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null; // 传入的task执行完毕
}
}
// 如果既没有传入对象,taskQueue也为空,那么我们就将MyThread对象从threads集合中移除
synchronized(threads) {
System.out.println("没有任务可以执行,从任务队列中移除 MyThread 对象 " + this);
threads.remove(this);
}
}
}

在使用无超时时间版本中,当任务数量较多时,产生任务的主线程就会阻塞。我们想根据不同的策略,让主线程进行不同的处理(继续死等或有时限的等待)。我们引入拒绝策略 RejectPolicy,来增强线程池,使其可以使用不同的策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyThreadPool {
// 任务队列
private MyBlockingQueue<Runnable> taskQueue;

// 线程集合
private HashSet<MyThread> threads = new HashSet<>();

// 核心线程数
private int coreNum;

// 获取任务的超时时间与时间单位
private long timeout;
private TimeUnit timeUnit;

// 策略模式:当等待队列已满的策略
private RejectPolicy<Runnable> rejectPolicy;

// 构造方法
public MyThreadPool (int coreNum, long timeout, TimeUnit timeUnit,
int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreNum = coreNum;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new MyBlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}

// 执行任务
public void excute(Runnable task) {
synchronized (threads) {
// 如果任务数没有超过coreNum时,可以直接交给MyThread对象执行;
if (threads.size() < coreNum) {
MyThread t = new MyThread(task);
System.out.println("新增 MyThread 对象" + t + "\t 来执行任务:" + task);
threads.add(t);
t.start();
} else {
// 如果超过了coreNum,则加入任务队列taskQueue
System.out.println("将任务:" +task + " 加入任务队列");
taskQueue.tryPut(rejectPolicy, task);
// 这里列出一些加入任务队列的策略(策略模式):
// 1 死等
// 2 带超时时间的等待
// 3 让调用者放弃任务执行
// 4 让调用者抛出异常
// 5 让调用者自己执行
}
}
}
}
1
2
3
4
5
// 拒绝策略的函数式接口
@FunctionalInterface
interface RejectPolicy<T> {
void reject(MyBlockingQueue<T> taskQueue, T task);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 测试1:死等
@Test
public void testMain() {
MyThreadPool myThreadPool = new MyThreadPool(1,1000, TimeUnit.MILLISECONDS, 1,
(queue, task) -> {
// (1) 死等
// queue.put(task);
// (2) 带超时时间的等待
// queue.offer(task,500, TimeUnit.MILLISECONDS);
// (3) 让调用者放弃任务执行
// System.out.println("放弃任务:" + task);
// (4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败:" + task);
// (5) 让调用者自己(主线程)执行
// task.run();
});

for (int i = 0; i < 3; i++) {
int j = i;
myThreadPool.excute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
} );
}
}

二、JDK 提供的线程池

1 ThreadPoolExecutor

这一小节,我们开始研究 JDK 给我提供的线程池 ThreadPoolExecutor

1.1 线程池状态

ThreadPoolExecutor 使用 int高 3 位来表示线程池状态(最高位是符号位),低 29 位表示线程数量

1
2
3
4
5
6
// 高3位:表示当前线程池运行状态,除去高3位之后的低位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示在 ctl 中,低 COUNT_BITS 位,是用于存放当前线程数量的位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 低 COUNT_BITS 位所能表达的最大数值,000 11111111111111111111 => 5亿多
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

四种状态:

1
2
3
4
5
6
7
8
9
10
// 111 000000000000000000,转换成整数后其实就是一个【负数】
private static final int RUNNING = -1 << COUNT_BITS;
// 000 000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
状态名 高 3 位 是否接收新任务 是否处理阻塞队列任务 说明
RUNNING 111 初始状态
SHUTDOWN 000 不会接收新任务,但会继续处理阻塞队列中剩余的任务
STOP 001 中断当前任务,并抛弃处理阻塞队列中的任务
TIDYING 010 - - 过渡状态。任务执行完毕,即将进入终结状态
TERMINATED 011 - - 终结状态

1.2 构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • int corePoolSize:核心线程数(最多保留的线程数)
  • int maximumPoolSize:最大线程数
  • long keepAliveTime:针对救急线程的生存时间
  • TimeUnit unit:生存时间的时间单位
  • BlockingQueue<Runnable> workQueue:阻塞队列
  • ThreadFactory threadFactory:线程工厂,可以在创建线程对象时起名,以区分不同的线程种类(例如区分线程属于线程池或是其他类型)
  • RejectedExecutionHandler handler:拒绝策略

【图解】

(1)线程池中核心线程的创建是懒惰式的,即只有当用到时才会创建。最大线程数 maximumPoolSize = 救急线程数 + 核心线程数 corePoolSize

image-20220927140901687

(2)随着任务的不断增加,核心线程都被分配了任务,且阻塞队列已满,那么新的任务就会被阻塞。此时就是救急线程工作的时候。

image-20220927135229558 image-20220927135937852

(3)救急线程存在生存时间,执行完任务后,一段时间内(keepAliveTime)没有使用,就会被销毁。

image-20220927140254949

(4)当救急线程和核心线程都被分配了任务,且阻塞队列也已经满了的情况下,才会执行拒绝策略 handler

image-20220927140717178

(5)JDK 提供的拒绝策略:

  • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略;

    1
    public static class AbortPolicy implements RejectedExecutionHandler
  • CallerRunsPolicy:让调用者执行任务;

    1
    public static class CallerRunsPolicy implements RejectedExecutionHandler 
  • DiscardPolicy:放弃本次任务;

    1
    public static class DiscardPolicy implements RejectedExecutionHandler 
  • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之。

    1
    public static class DiscardOldestPolicy implements RejectedExecutionHandler

1.3 newFixedThreadPool

固定大小线程池:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

【特点】

  • 核心线程数 = 最大线程数,即没有救急线程;
  • 阻塞队列通过 LinkedBlockingQueue 实现,是无界的,可存放任意数量的线程对象。

1.4 newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

【特点】

  • 核心线程数为 0,最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60 秒,意味着
    1. 线程池中的线程全部都是救急线程
    2. 救急线程可以无限创建。
  • 阻塞队列通过 SynchronousQueue 实现,没有容量。意为只要有任务就用救急线程执行。

1.5 newSingleThreadExecutor

单线程线程池

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

【特点】

  • 只有一个核心线程,多个任务排队执行。任务执行完毕后,该核心线程也不会被释放。
  • new Thread 创建一个线程串行执行的区别:如果线程在执行任务时由于意外结束,线程池会重新创建一个线程继续执行队列里的任务。
  • FinalizableDelegatedExecutorService 应用装饰器模式,只对外暴露了 ExecutorService 接口,不能调用 ThreadPoolExecutor 的特有方法(修改线程数等)

1.6 执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务
public void execute(Runnable command);

// 执行(提交)任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 执行(提交) tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 执行(提交) tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 执行(提交) tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny (Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 执行(提交) tasks 中所有任务,哪个任务先成功执行完牛,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny (Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
(1) execute()

对于第一种最普通的执行任务的方法 execute(task),我们可以看到它时没有返回值的;

1
2
// 执行任务
public void execute(Runnable command);

例:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testSubmit() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);

Future<String> future = threadPool.submit(
() -> {Thread.sleep(1000); return "ok";}
);

// 主线程阻塞在此,直到获得future返回对象
System.out.println(future.get());
}
(2)submit()

对于 submit() 方法是有返回值的,可以只用 Future 对象来接收返回值

1
2
// 执行(提交)任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testSubmit() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);

Future<String> future = threadPool.submit(() -> {
try {
Thread.sleep(1000);
return "ok";
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 主线程阻塞在此,直到获得future返回对象
System.out.println(future.get());
}

输出:

1
ok
(3)invokeAll()

invokeAll() 方法的输入是一个任务的集合返回结果的集合

1
2
3
4
5
6
7
8
// 执行(提交) tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 执行(提交) tasks 中所有任务,带超时时间。超时后丢弃任务。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void testInvokeAll() throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);

List<Future<String>> future = threadPool.invokeAll(Arrays.asList(
() -> { Thread.sleep(1000); return "thread_1";},
() -> { Thread.sleep(500); return "thread_2";},
() -> { Thread.sleep(3000); return "thread_3";}
));

// 主线程阻塞在此,直到获得future返回对象
for (Future<String> f:future) {
System.out.println(f.get());
}
}

输出:

1
2
3
thread_1
thread_2
thread_3
(4)invokeAny()

invokeAny() 方法的输入是一个任务的集合返回第一个执行结束的任务的结果,其他任务取消

1
2
3
4
5
6
7
8
// 执行(提交) tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny (Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 执行(提交) tasks 中所有任务,带超时时间。超时后丢弃任务。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testInvokeAny() throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(2);

String result = threadPool.invokeAny(Arrays.asList(
() -> { Thread.sleep(1000); return "thread_1";},
() -> { Thread.sleep(500); return "thread_2";},
() -> { Thread.sleep(3000); return "thread_3";}
));

// 主线程阻塞在此,直到获得future返回对象
System.out.println(result);
}

输出:

1
thread_2

1.7 关闭线程池

(1)shutdown()

停止线程池,线程池状态变为 SHUTDOWN

  • 不会接受新任务
  • 会把已提交的任务和队列中的任务执行完毕
  • 调用时不会阻塞调用线程(主线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取线程池全局锁
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为 SHUTDOWN,如果线程池状态大于 SHUTDOWN,就不会设置直接返回
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 空方法,子类可以扩展
onShutdown();
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
tryTerminate();
}
(2)shutdownNow()

直接关闭线程池,不会等待任务执行完成。线程池状态变为 STOP

  • 不会接受新任务
  • 将队列中的任务丢弃
  • 使用 interrupt 方法打断正在执行的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public List<Runnable> shutdownNow() {
// 返回值引用
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取线程池全局锁
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为STOP
advanceRunState(STOP);
// 中断线程池中【所有线程】
interruptWorkers();
// 从阻塞队列中导出未处理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}

tryTerminate();
// 返回当前任务队列中 未处理的任务。
return tasks;
}
(3)tryTerminate()

设置为 TERMINATED 状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void tryTerminate() {
for (;;) {
// 获取 ctl 的值
int c = ctl.get();
// 线程池正常,或者有其他线程执行了状态转换的方法,当前线程直接返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
// 线程池是 SHUTDOWN 并且任务队列不是空,需要去处理队列中的任务
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

// 执行到这里说明线程池状态为 STOP 或者线程池状态为 SHUTDOWN 并且队列已经是空
// 判断线程池中线程的数量
if (workerCountOf(c) != 0) {
// 【中断一个空闲线程】,在 queue.take() | queue.poll() 阻塞空闲
// 唤醒后的线程会在getTask()方法返回null,
// 执行 processWorkerExit 退出逻辑时会再次调用 tryTerminate() 唤醒下一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 池中的线程数量为 0 来到这里
final ReentrantLock mainLock = this.mainLock;
// 加全局锁
mainLock.lock();
try {
// 设置线程池状态为 TIDYING 状态,线程数量为 0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 结束线程池
terminated();
} finally {
// 设置线程池状态为TERMINATED状态。
ctl.set(ctlOf(TERMINATED, 0));
// 【唤醒所有调用 awaitTermination() 方法的线程】
termination.signalAll();
}
return;
}
} finally {
// 释放线程池全局锁
mainLock.unlock();
}
}
}

三、设计模式 - 工作线程

1 定义

有限的工作线程Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式

例如,

一个餐馆的服务员们(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了 (对比另一种多线程设计模式:Thread-Per- Message)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

例如,

如果服务员们(线程)既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)。 显然效率不高,分成服务员(线程池A) 与厨师(线程池B) 更为合理。

2 饥饿现象

【注意 ⚠️】要与线程的“死锁与饥饿” 的概念区分开。固定大小线程池会有饥饿现象。

例如,

两位服务员(线程)是同一个线程池中的两个线程。他们要做的事情是:为客人点餐到后厨做菜,这是两个阶段的工作:

  • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
  • 后厨做菜:做菜。

比如服务员 A 处理了点餐任务,接下来它要等着服务员 B 把菜做好,然后上菜。但现在同时来了两个客人,这个时候服务员 A服务员 B 都去处理点餐了,这时没人做饭了,这时发生了饥饿现象。

不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。

3 线程池的大小

线程池过小】会导致程序不能充分地利用系统资源、容易导致饥饿

线程池过大】会导致更多的线程上下文切换,占用更多内存,容易 OOM

3.1 CPU 密集型运算

通常采用

CPU+1CPU 核心数 + 1

能够实现最优的 CPU 利用率,+1+1 是保证当线程由于页缺失故障(操作系统) 或其他原因导致暂停时,额外的这个线程就能发挥作用,保证 CPU 时钟周期不被浪费。

3.2 I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行 I/O 操作时、远程 RPC 调用(HTTP socket)时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下:

线=×CPU×(=CPU+)÷CPU线程数 = 核心数 \times 期望 CPU 利用率 \times 总时间_{(=CPU计算时间+等待时间)} \div CPU 计算时间

例如 4 核 CPU 计算时间是 50%,其它等待时间是 50%,期望 CPU 被 100% 利用,套用公式

1
4 * 100% * 100% / 50% = 8

例如 4 核 CPU 计算时间是 10%,其它等待时间是 90%,期望 CPU 被 100% 利用,套用公式

1
4 * 100% * 100% / 10% = 40