本文介绍 jdk 线程池原理
主流程
线程池主流程
状态控制
线程池的状态使用一个原子操作的变量 ctl 控制
1 2 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));
ctl 包含了两个参数:
runState:线程池状态
workerCount:工作的线程数
ctl 的结构:32 位整型,低 29 位存 workerCount,高 3 位存
runState,例如:
RUNNING:111-xxx
SHUTDOWN:000-xxx
STOP:001-xxx
TIDYING:010-xxx
TERMINATED:011-xxx
可以看到:RUNNING < SHUTDOWN < STOP < TIDYING <
TERMINATED,明确状态之间的关系非常重要
如果用两个变量分别表示 A 和
B,在维护两者关系时,需要加锁保证两者的一致性
举个例子,在一个时刻先读 A 再读 B,代码中如果不加锁,可能读 A
的时候,B 的值被修改了,那么 A 和 B 的状态就不一致了;
为了避免对 A 和 B 加锁,使用一个原子变量代表 A 和 B,可以保障每次读 A
和 B 的值 状态是一致的
那么 A 和 B
哪个放在高位呢?应该将表示状态、分类的放高位,这样方便用比较大小来判断状态,不需要取出高位
生命周期
线程池状态
RUNNING:运行状态,线程池可以接收新的任务,可以处理已添加到任务队列中的任务
SHUTDOWN:关闭状态,当调用 shutdown()
方法后,线程池进入此状态。此时,线程池不再接收新任务,已存在任务队列中的任务继续执行直到他们全部完成
STOP:停止状态,当调用 shutdownNow()
方法后,线程池进入此状态。此时,线程池不再接收新任务,并尽力中断正在执行的任务,清空任务队列
TIDYING:整理状态,当所有任务都已结束,并且所有 worker 线程(除了
finalizer 线程)都已结束,线程池从 SHUTDOWN 或 STOP
状态转换到此状态,并执行 terminated() 方法进行清理工作
TERMINATED:在 terminated()
方法执行完,线程池进入此状态。此时,线程池终止,所有资源被释放,生命周期结束
结合上面状态之间的关系,可以用下面表达式来判断线程池一些行为:
state >= SHUTDOWN:线程池不再接收新任务
state >= STOP:线程池不再执行任务,任务队列为空
state < STOP:线程池可以执行任务
核心类
ThreadPoolExecutor
管理线程池的核心类
重要概念
线程池:为了复用线程来执行多个任务,以减少线程创建和销毁的开销,提高性能
任务队列:用于存放待执行的任务,当有线程空闲时,会从队列中取出任务进行执行
主要属性
corePoolSize: 核心线程数
maximumPoolSize:最大线程数,线程池容量
keepAliveTime:线程空闲等待时间
workQueue:等待队列/任务队列
works:工作线程集合
threadFactory:线程工厂,用于创建线程(包括核心线程和非核心线程)
handler:拒绝策略
构造方法:
1 2 3 4 5 6 7 8 ThreadPoolExecutor executor = new ThreadPoolExecutor ( 5 , 10 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <Runnable>(), new ThreadPoolExecutor .AbortPolicy() );
关键方法在后面分析
Worker
工作线程,负责从任务队列中获取任务并执行,其设计目标是高效地管理任务调度和执行
本文代码基于 jdk 19.0.1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } }
类分析:
首先 Worker一个 Runnable 类,说明 Worker 可以被一个线程执行
Worker 又是一个 AbstractQueuedSynchronizer(AQS)类
AQS
是一个并发编程的框架,定义了一些模版方法,子类实现这些方法来指定锁行为,来实现独占锁或共享锁;比如
jdk 提供的实现类,如独占锁 ReentrantLock,共享锁
CountDownLatch、Semaphore
为什么 Worker 是一个 AQS 呢?可以看下 Worker 的抢锁方式:
1 2 3 4 5 6 7 protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; }
可以看出,Worker 是一个独占 & 不可重入锁
独占是为了只有一个线程才能来执行 Worker
的任务,在执行过程中,其他线程不能来重复执行任务;不可重入即一个线程不能重复执行任务
构造方法:
1 2 3 4 5 Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); }
核心类之间的关系
worker.drawio
核心方法
submit
向线程池提交一个任务
1 2 3 4 5 6 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; }
执行的事情:
封装任务对象
调用 execute 执行任务
execute
执行一个任务
要解决的事情:
怎么创建线程:核心 or 非核心
什么情况要放入等待队列
什么情况要拒绝任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
流程分析:
如果工作线程数(workerCount)小于
corePoolSize,直接创建核心线程执行任务
如果 workerCount 大于等于 corePoolSize,向任务队列放入任务
二次检查线程池运行状态,如果 workerCount 等于
0,则创建一个非核心工作线程
如果向任务队列投放任务失败(任务队列已经满了),尝试创建非核心线程执行任务
如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务
线程池流程.drawio
addWorker
功能:创建一个工作线程
输入:任务,是否创建核心线程
输出:true 创建工作线程成功
代码走读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false ; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null )) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException (); workers.add(w); workerAdded = true ; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
上面对于边界判断比较复杂,这里先分析主要流程:
通过 CAS 的方式,尝试申请一个位置,分配给要创建的工作线程
创建工作线程
加锁,将工作线程加入到线程集合中,更新最大峰值,释放锁
启动线程,执行任务
启动失败,将现场从集合中移除,释放位置
疑问:这里创建的线程怎么没有区分是核心线程还是非核心线程,核心线程不是要多停留一段时间再释放吗?
答:不需要区分一个线程是核心的还是非核心的,只要控制线程数量就可以了,即:如果大于
corePoolSize,则直接释放,直到等于 corePoolSize
runWorker
在上面的 addWorker 方法中,会运行 thead.start()
启动线程,执行任务
我们知道,thread.start() 方法会执行传入 Runnable 对象的 run()
方法
1 2 3 4 5 6 7 8 Runnable worker = new Runnable () { @Override public void run () { System.out.println("Hello!" ); } }; Thread thread = new Thread (worker);thread.start();
Worker 是一个 Runnable 类,在 Worker 构造时,将自己传入 Thread 中
1 2 3 4 5 Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); }
在执行任务时,从 Worker 对象 w 中,获取 thread t,运行 t.start()
时,会执行 w.run() 方法
1 2 3 4 public void run () { runWorker(this ); }
进而运行 runWorker 方法,执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
过程分析:
如果 task 不为空,直接执行,执行完后将 task 设为空
如果 task 为空,循环不断地通过 getTask 从任务队列中获取任务
一次任务执行过程:加锁,调用 task.run 执行任务,释放锁
如果规定时间内无法从任务队列中获取任务,将 worker
从线程池中移除,结束工作线程
执行超时,也移除 worker
getTask
从阻塞队列中取一个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
流程分析:
如果当前状态是 STOP、TIDYING、TERMINATED,或者 SHUTDOWN
且任务队列为空,释放所有工作线程
如果任务队列是空的,且 workerCount
大于核心线程数,释放一个工作线程
从任务队列中取一个任务
如果线程没有设置存活时间,从任务队列中取任务是阻塞的,会一直等待有新的任务才返回;如果设置了存活时间,阻塞到
keepAlive 时返回 null
疑问:当线程等待任务超,且 workerCount > 1,会使用
compareAndDecrementWorkerCount 减少 ctl
的值(workerCount),并不会释放工作线程,只减少 ctl 有用吗
processWorkerExit
清理工作线程
输入:1. 工作线程;2. 任务是否意外完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
首先看下 completedAbruptly 的含义:
completedAbruptly = false,getTask 返回 null,任务未执行,workerCount
已经减 1
completedAbruptly = true,任务已执行,执行过程中发生异常导致
流程分析:
如果任务已执行,但中途发生异常,workerCount = workerCount - 1
更新已完成任务数(completedTaskCount 全局变量,加锁)
从工作队列中移除当前工作线程(workers 全局变量,加锁)
尝试结束线程池
判断是否要新增线程:如果线程池可以执行任务 & 任务未执行 &
线程数小于最小线程数,创建一个非核心线程
processWorkerExit() 执行完后,该工作线程的生命周期已经完结
线程的生命周期如下图:
线程状态变化.drawio
疑问:为什么清理工作线程时,要尝试结束线程池呢?
答:这是当线程池在收到 shutdown 命令时,进入 SHUTDOWN
状态,会拒绝新任务的提交,并继续执行任务队列中的任务,每执行完一个任务,判断下任务队列是否被清空,如果清空,则可以从
SHUTDOWN 状态进入 TIDYING 状态
下面看下 tryTerminate 方法
tryTerminate
尝试结束线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
流程分析:
线程池在运行中,不结束
线程池在 TIDYING、TERMINATED 状态,不需要结束
线程池在 SHUTDOWN 状态,且 任务队列不为空,不结束
如果可以结束,且工作线程队列不为空,中断一个空闲的工作线程
如果工作线程队列为空,将线程池状态改为 TIDYING,执行 terminated
方法,结束线程池
疑问:为什么只中断一个空闲的工作线程,这时不是已经没有任务了吗
这里只清理一个线程,是因为其他线程可能在执行任务,等他们执行完,自然会再清理一个线程;如果每个线程执行完,都清理所有空闲线程,显然会带来大量的并发冲突
interruptIdleWorkers
中断一个空闲的进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
流程:遍历工作线程集合,如果线程没有中断,且没有执行任务(可获得锁),中断该线程
shutdown
关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
整体流程
下面用一个图将核心方法串联起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ExecutorService executorService = ThreadPoolExecutor executor = new ThreadPoolExecutor ( 5 , 10 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue <Runnable>(), new ThreadPoolExecutor .AbortPolicy() ); for (int i = 0 ; i < 10 ; i++) { final int taskId = i; executorService.submit(() -> { try { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executorService.shutdown();
整体流程
总结
本文介绍了 jvm
线程池原理,对核心类和核心方法依次分析,对线程池整体执行流程、执行原理进行介绍,对于代码中的疑问,待后续解答
Reference
Java
线程池源码解析