我们可以思考如下场景:线程是一个系统资源,操作系统每分配一个线程,就会消耗一定的资源。如果在高并发的环境下, 我们为每个任务都创建一个线程的话,那么对资源的消耗时非常大的。为了减小在这中环境下的系统消耗,我们引入线程池概念来维护多个线程,将任务分配给线程池中的线程。
什么是线程池?
线程池其实就是一种多线程处理的形式,处理过程中可以将任务添加到到队列中,然后在创建线程后自动启动这些任务。
使用线程池的优势:
使用线程池可以统一的管理线程和控制线程并发数量;
可以与任务分离,提升线程重用度;
提升系统的响应速度
我们主要通过一下几个部分来介绍 Java 中的线程池:
自定义线程池
JDK 提供的线程池:ThreadPoolExecutor
设计模式 - 工作线程
一、自定义线程池
我们在介绍 JDK 提供的线程池之前,先自定义一个线程池,以更好的了解线程池的工作方式。
我们可以回忆一下生产者消费者问题 ,线程池其实就符合这个模型。线程池中的线程 thread
就是消费者 ,阻塞队列中的任务 task
就是商品 ,产生任务的线程 main
就是生产者 。如下图
下面我们分别实现上述部分。
1 阻塞队列 BlockingQueue
在这种“生产者消费者”模型下,生产与消费 task
的速率很可能是不一致的,生产的太多而来不及消费。此时我们就需要阻塞队列 BlockingQueue
来储存这些“产品”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 public class MyBlockingQueue <T> { private Deque<T> queue = new ArrayDeque <>(); private int capacity; private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); public MyBlockingQueue (int capacity) { this .capacity = capacity; } public T take () { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); System.out.println("从队列中取出一个任务" + t); System.out.println("队列中的元素有:\t" + queue.toString()); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public void put (T element) { lock.lock(); try { while (queue.size() == capacity) { try { System.out.println(element + " 等待加入任务队列..." ); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(element); System.out.println("队列中的元素有:\t" + queue.toString()); emptyWaitSet.signal(); } finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capacity) { try { System.out.println(task + " 等待加入任务队列..." ); if (nanos <= 0 ) { System.out.println(task + " 等待时间超时,放弃!" ); return false ; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(task); System.out.println("队列中的元素有:\t" + queue.toString()); emptyWaitSet.signal(); return true ; } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut (MyThreadPool.RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity) { rejectPolicy.reject(this , task); } else { queue.addLast(task); System.out.println("队列中的元素有:\t" + queue.toString()); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
2 线程池 ThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class MyThreadPool { private MyBlockingQueue<Runnable> taskQueue; private HashSet<MyThread> threads = new HashSet <>(); private int coreNum; private long timeout; private TimeUnit timeUnit; public MyThreadPool (int coreNum, long timeout, TimeUnit timeUnit, int queueCapacity) { this .coreNum = coreNum; this .timeout = timeout; this .timeUnit = timeUnit; this .taskQueue = new MyBlockingQueue <>(queueCapacity); } public void excute (Runnable task) { if (threads.size() < coreNum) { Mythread t = new MyThread (task); threads.add(t); t.start(); } else { taskQueue.put(task); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class MyThread extends Thread { private Runnable task; public MyThread (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.take()) != null ) { try { System.out.println(Thread.currentThread() + " 正在执行任务:" +task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (threads) { System.out.println("没有任务可以执行,从任务队列中移除 MyThread 对象 " + this ); threads.remove(this ); } } }
在使用无超时时间版本中,当任务数量较多时,产生任务的主线程就会阻塞。我们想根据不同的策略,让主线程进行不同的处理(继续死等或有时限的等待)。我们引入拒绝策略 RejectPolicy
,来增强线程池,使其可以使用不同的策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class MyThreadPool { private MyBlockingQueue<Runnable> taskQueue; private HashSet<MyThread> threads = new HashSet <>(); private int coreNum; private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public MyThreadPool (int coreNum, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreNum = coreNum; this .timeout = timeout; this .timeUnit = timeUnit; this .taskQueue = new MyBlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } public void excute (Runnable task) { synchronized (threads) { if (threads.size() < coreNum) { MyThread t = new MyThread (task); System.out.println("新增 MyThread 对象" + t + "\t 来执行任务:" + task); threads.add(t); t.start(); } else { System.out.println("将任务:" +task + " 加入任务队列" ); taskQueue.tryPut(rejectPolicy, task); } } } }
1 2 3 4 5 @FunctionalInterface interface RejectPolicy <T> { void reject (MyBlockingQueue<T> taskQueue, T task) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Test public void testMain () { MyThreadPool myThreadPool = new MyThreadPool (1 ,1000 , TimeUnit.MILLISECONDS, 1 , (queue, task) -> { }); for (int i = 0 ; i < 3 ; i++) { int j = i; myThreadPool.excute(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(j); } ); } }
二、JDK
提供的线程池
1 ThreadPoolExecutor
这一小节,我们开始研究 JDK
给我提供的线程池 ThreadPoolExecutor
。
1.1 线程池状态
ThreadPoolExecutor
使用 int
的高 3 位 来表示线程池状态 (最高位是符号位 ),低 29 位表示线程数量 。
1 2 3 4 5 6 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;
四种状态:
1 2 3 4 5 6 7 8 9 10 private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
状态名
高 3 位
是否接收新任务
是否处理阻塞队列任务
说明
RUNNING
111
是
是
初始状态
SHUTDOWN
000
否
是
不会接收新任务,但会继续处理阻塞队列中剩余的任务
STOP
001
否
否
中断当前任务,并抛弃处理阻塞队列中的任务
TIDYING
010
-
-
过渡状态。任务执行完毕,即将进入终结状态
TERMINATED
011
-
-
终结状态
1.2 构造方法
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
int corePoolSize
:核心线程数(最多保留的线程数)
int maximumPoolSize
:最大线程数
long keepAliveTime
:针对救急线程的生存时间
TimeUnit unit
:生存时间的时间单位
BlockingQueue<Runnable> workQueue
:阻塞队列
ThreadFactory threadFactory
:线程工厂,可以在创建线程对象时起名,以区分不同的线程种类 (例如区分线程属于线程池或是其他类型)
RejectedExecutionHandler handler
:拒绝策略
【图解】
(1)线程池中核心线程的创建是懒惰式 的,即只有当用到时才会创建。最大线程数 maximumPoolSize
= 救急线程数 + 核心线程数 corePoolSize
。
(2)随着任务的不断增加,核心线程都被分配了任务,且阻塞队列已满 ,那么新的任务就会被阻塞 。此时就是救急线程 工作的时候。
(3)救急线程存在生存时间 ,执行完任务后,一段时间内(keepAliveTime
)没有使用,就会被销毁。
(4)当救急线程和核心线程都被分配了任务 ,且阻塞队列也已经满了 的情况下,才会执行拒绝策略 handler
。
(5)JDK
提供的拒绝策略:
AbortPolicy
:让调用者抛出 RejectedExecutionException
异常,默认策略;
1 public static class AbortPolicy implements RejectedExecutionHandler
CallerRunsPolicy
:让调用者执行任务;
1 public static class CallerRunsPolicy implements RejectedExecutionHandler
DiscardPolicy
:放弃本次任务;
1 public static class DiscardPolicy implements RejectedExecutionHandler
DiscardOldestPolicy
:放弃队列中最早的任务,本任务取而代之。
1 public static class DiscardOldestPolicy implements RejectedExecutionHandler
1.3 newFixedThreadPool
固定大小线程池:
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
【特点】
核心线程数 = 最大线程数,即没有救急线程;
阻塞队列通过 LinkedBlockingQueue
实现,是无界的,可存放任意数量的线程对象。
1.4 newCachedThreadPool
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
【特点】
核心线程数为 0,最大线程数是 Integer.MAX_VALUE
,救急线程的空闲生存时间是 60 秒,意味着
线程池中的线程全部都是救急线程 ;
救急线程可以无限创建。
阻塞队列通过 SynchronousQueue
实现,没有容量。意为只要有任务就用救急线程执行。
1.5 newSingleThreadExecutor
单线程线程池
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
【特点】
只有一个核心线程,多个任务排队执行。任务执行完毕后,该核心线程也不会被释放。
与 new Thread
创建一个线程串行执行的区别:如果线程在执行任务时由于意外结束,线程池会重新创建一个线程继续执行队列里的任务。
FinalizableDelegatedExecutorService
应用装饰器模式 ,只对外暴露了 ExecutorService
接口,不能调用 ThreadPoolExecutor
的特有方法(修改线程数等)
1.6 执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) ;<T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
(1) execute()
对于第一种最普通的执行任务的方法 execute(task)
,我们可以看到它时没有返回值的;
1 2 public void execute (Runnable command) ;
例:
1 2 3 4 5 6 7 8 9 10 11 @Test public void testSubmit () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(2 ); Future<String> future = threadPool.submit( () -> {Thread.sleep(1000 ); return "ok" ;} ); System.out.println(future.get()); }
(2)submit()
对于 submit()
方法是有返回值的,可以只用 Future
对象来接收返回值
1 2 <T> Future<T> submit (Callable<T> task) ;
例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Test public void testSubmit () { ExecutorService threadPool = Executors.newFixedThreadPool(2 ); Future<String> future = threadPool.submit(() -> { try { Thread.sleep(1000 ); return "ok" ; } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(future.get()); }
输出:
(3)invokeAll()
invokeAll()
方法的输入 是一个任务的集合 ,返回结果的集合 :
1 2 3 4 5 6 7 8 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void testInvokeAll () throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newFixedThreadPool(2 ); List<Future<String>> future = threadPool.invokeAll(Arrays.asList( () -> { Thread.sleep(1000 ); return "thread_1" ;}, () -> { Thread.sleep(500 ); return "thread_2" ;}, () -> { Thread.sleep(3000 ); return "thread_3" ;} )); for (Future<String> f:future) { System.out.println(f.get()); } }
输出:
1 2 3 thread_1 thread_2 thread_3
(4)invokeAny()
invokeAny()
方法的输入 是一个任务的集合 ,返回第一个执行结束的任务的结果,其他任务取消 :
1 2 3 4 5 6 7 8 <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
例如
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void testInvokeAny () throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(2 ); String result = threadPool.invokeAny(Arrays.asList( () -> { Thread.sleep(1000 ); return "thread_1" ;}, () -> { Thread.sleep(500 ); return "thread_2" ;}, () -> { Thread.sleep(3000 ); return "thread_3" ;} )); System.out.println(result); }
输出:
1.7 关闭线程池
(1)shutdown()
停止线程池,线程池状态变为 SHUTDOWN
不会接受新任务
会把已提交的任务和队列中的任务执行完毕
调用时不会阻塞调用线程(主线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
(2)shutdownNow()
直接关闭线程池,不会等待任务执行完成。线程池状态变为 STOP
不会接受新任务
将队列中的任务丢弃
使用 interrupt
方法打断正在执行的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
(3)tryTerminate()
设置为 TERMINATED
状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
三、设计模式 - 工作线程
1 定义
让有限的工作线程 (Worker Thread
)来轮流异步处理无限多的任务 。也可以将其归类为分工模式,它的典型实现就是线程池 ,也体现了经典设计模式中的享元模式 。
例如,
一个餐馆的服务员们(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了 (对比另一种多线程设计模式:Thread-Per- Message)
注意,不同任务类型应该使用不同的线程池 ,这样能够避免饥饿,并能提升效率。
例如,
如果服务员们(线程)既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)。 显然效率不高,分成服务员(线程池A) 与厨师(线程池B) 更为合理。
2 饥饿现象
【注意 ⚠️】要与线程的“死锁与饥饿” 的概念区分开。固定大小线程池 会有饥饿现象。
例如,
两位服务员(线程)是同一个线程池中的两个线程。他们要做的事情是:为客人点餐 和到后厨做菜 ,这是两个阶段的工作:
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待 ;
后厨做菜:做菜。
比如服务员 A 处理了点餐任务,接下来它要等着服务员 B 把菜做好,然后上菜。但现在同时来了两个客人 ,这个时候服务员 A 和服务员 B 都去处理点餐了,这时没人做饭了,这时发生了饥饿现象。
不同任务类型应该使用不同的线程池 ,这样能够避免饥饿,并能提升效率。
3 线程池的大小
【线程池过小 】会导致程序不能充分地利用系统资源、容易导致饥饿
【线程池过大 】会导致更多的线程上下文切换,占用更多内存,容易 OOM
3.1 CPU 密集型运算
通常采用
C P U 核 心 数 + 1 CPU 核心数 + 1
C P U 核 心 数 + 1
能够实现最优的 CPU 利用率,+ 1 +1 + 1 是保证当线程由于页缺失故障(操作系统) 或其他原因导致暂停时,额外的这个线程就能发挥作用,保证 CPU 时钟周期不被浪费。
3.2 I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行 I/O 操作时、远程 RPC 调用(HTTP socket
)时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:
线 程 数 = 核 心 数 × 期 望 C P U 利 用 率 × 总 时 间 ( = C P U 计 算 时 间 + 等 待 时 间 ) ÷ C P U 计 算 时 间 线程数 = 核心数 \times 期望 CPU 利用率 \times 总时间_{(=CPU计算时间+等待时间)} \div CPU 计算时间
线 程 数 = 核 心 数 × 期 望 C P U 利 用 率 × 总 时 间 ( = C P U 计 算 时 间 + 等 待 时 间 ) ÷ C P U 计 算 时 间
例如 4 核 CPU 计算时间是 50%,其它等待时间是 50%,期望 CPU 被 100% 利用,套用公式
1 4 * 100 % * 100 % / 50 % = 8
例如 4 核 CPU 计算时间是 10%,其它等待时间是 90%,期望 CPU 被 100% 利用,套用公式
1 4 * 100 % * 100 % / 10 % = 40