0%

线程池原理解析

本文介绍 jdk 线程池原理

主流程

线程池主流程

状态控制

线程池的状态使用一个原子操作的变量 ctl 控制

1
2
// java.util.concurrent.ThreadPoolExecutor#ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 包含了两个参数:

  • runState:线程池状态
  • workerCount:工作的线程数

ctl 的结构:32 位整型,低 29 位存 workerCount,高 3 位存 runState,例如:

RUNNING:111-xxx

SHUTDOWN:000-xxx

STOP:001-xxx

TIDYING:010-xxx

TERMINATED:011-xxx

可以看到:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,明确状态之间的关系非常重要

如果用两个变量分别表示 A 和 B,在维护两者关系时,需要加锁保证两者的一致性

举个例子,在一个时刻先读 A 再读 B,代码中如果不加锁,可能读 A 的时候,B 的值被修改了,那么 A 和 B 的状态就不一致了;

为了避免对 A 和 B 加锁,使用一个原子变量代表 A 和 B,可以保障每次读 A 和 B 的值 状态是一致的

那么 A 和 B 哪个放在高位呢?应该将表示状态、分类的放高位,这样方便用比较大小来判断状态,不需要取出高位

生命周期

线程池状态

RUNNING:运行状态,线程池可以接收新的任务,可以处理已添加到任务队列中的任务

SHUTDOWN:关闭状态,当调用 shutdown() 方法后,线程池进入此状态。此时,线程池不再接收新任务,已存在任务队列中的任务继续执行直到他们全部完成

STOP:停止状态,当调用 shutdownNow() 方法后,线程池进入此状态。此时,线程池不再接收新任务,并尽力中断正在执行的任务,清空任务队列

TIDYING:整理状态,当所有任务都已结束,并且所有 worker 线程(除了 finalizer 线程)都已结束,线程池从 SHUTDOWN 或 STOP 状态转换到此状态,并执行 terminated() 方法进行清理工作

TERMINATED:在 terminated() 方法执行完,线程池进入此状态。此时,线程池终止,所有资源被释放,生命周期结束

结合上面状态之间的关系,可以用下面表达式来判断线程池一些行为:

  1. state >= SHUTDOWN:线程池不再接收新任务
  2. state >= STOP:线程池不再执行任务,任务队列为空
  3. state < STOP:线程池可以执行任务

核心类

ThreadPoolExecutor

管理线程池的核心类

重要概念
  • 线程池:为了复用线程来执行多个任务,以减少线程创建和销毁的开销,提高性能
  • 任务队列:用于存放待执行的任务,当有线程空闲时,会从队列中取出任务进行执行
主要属性
  • corePoolSize: 核心线程数
  • maximumPoolSize:最大线程数,线程池容量
  • keepAliveTime:线程空闲等待时间
  • workQueue:等待队列/任务队列
  • works:工作线程集合
  • threadFactory:线程工厂,用于创建线程(包括核心线程和非核心线程)
  • handler:拒绝策略

构造方法:

1
2
3
4
5
6
7
8
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // time unit
new LinkedBlockingQueue<Runnable>(), // workQueue
new ThreadPoolExecutor.AbortPolicy() // handler
);

关键方法在后面分析

Worker

工作线程,负责从任务队列中获取任务并执行,其设计目标是高效地管理任务调度和执行

本文代码基于 jdk 19.0.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}

类分析:

首先 Worker一个 Runnable 类,说明 Worker 可以被一个线程执行

Worker 又是一个 AbstractQueuedSynchronizer(AQS)类

AQS 是一个并发编程的框架,定义了一些模版方法,子类实现这些方法来指定锁行为,来实现独占锁或共享锁;比如 jdk 提供的实现类,如独占锁 ReentrantLock,共享锁 CountDownLatch、Semaphore

为什么 Worker 是一个 AQS 呢?可以看下 Worker 的抢锁方式:

1
2
3
4
5
6
7
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

可以看出,Worker 是一个独占 & 不可重入锁

独占是为了只有一个线程才能来执行 Worker 的任务,在执行过程中,其他线程不能来重复执行任务;不可重入即一个线程不能重复执行任务

构造方法:

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

核心类之间的关系

worker.drawio

核心方法

submit

向线程池提交一个任务

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

执行的事情:

  1. 封装任务对象
  2. 调用 execute 执行任务

execute

执行一个任务

要解决的事情:

  • 怎么创建线程:核心 or 非核心
  • 什么情况要放入等待队列
  • 什么情况要拒绝任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

流程分析:

  1. 如果工作线程数(workerCount)小于 corePoolSize,直接创建核心线程执行任务
  2. 如果 workerCount 大于等于 corePoolSize,向任务队列放入任务
  3. 二次检查线程池运行状态,如果 workerCount 等于 0,则创建一个非核心工作线程
  4. 如果向任务队列投放任务失败(任务队列已经满了),尝试创建非核心线程执行任务
  5. 如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务
线程池流程.drawio

addWorker

功能:创建一个工作线程

输入:任务,是否创建核心线程

输出:true 创建工作线程成功

代码走读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// java.util.concurrent.ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// 拒绝创建任务的条件:
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 成功更新工作线程数,申请到一个位置
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 全局锁,因为会改变一些全局值(ctl, largestPoolSize)和非线程安全的集合(workers)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
// 将创建的工作线程加到工作线程集合中
workers.add(w);
workerAdded = true;
int s = workers.size();
// 更新线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 启动线程失败,从工作线程集合中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

上面对于边界判断比较复杂,这里先分析主要流程:

  1. 通过 CAS 的方式,尝试申请一个位置,分配给要创建的工作线程
  2. 创建工作线程
  3. 加锁,将工作线程加入到线程集合中,更新最大峰值,释放锁
  4. 启动线程,执行任务
  5. 启动失败,将现场从集合中移除,释放位置

疑问:这里创建的线程怎么没有区分是核心线程还是非核心线程,核心线程不是要多停留一段时间再释放吗?

答:不需要区分一个线程是核心的还是非核心的,只要控制线程数量就可以了,即:如果大于 corePoolSize,则直接释放,直到等于 corePoolSize

runWorker

在上面的 addWorker 方法中,会运行 thead.start() 启动线程,执行任务

我们知道,thread.start() 方法会执行传入 Runnable 对象的 run() 方法

1
2
3
4
5
6
7
8
Runnable worker = new Runnable() {
@Override
public void run() {
System.out.println("Hello!");
}
};
Thread thread = new Thread(worker);
thread.start();

Worker 是一个 Runnable 类,在 Worker 构造时,将自己传入 Thread 中

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

在执行任务时,从 Worker 对象 w 中,获取 thread t,运行 t.start() 时,会执行 w.run() 方法

1
2
3
4
// java.util.concurrent.ThreadPoolExecutor.Worker#run
public void run() {
runWorker(this);
}

进而运行 runWorker 方法,执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任务为空,则通过 getTask 从阻塞队列中获取一个任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

过程分析:

  1. 如果 task 不为空,直接执行,执行完后将 task 设为空
  2. 如果 task 为空,循环不断地通过 getTask 从任务队列中获取任务
  3. 一次任务执行过程:加锁,调用 task.run 执行任务,释放锁
  4. 如果规定时间内无法从任务队列中获取任务,将 worker 从线程池中移除,结束工作线程
  5. 执行超时,也移除 worker

getTask

从阻塞队列中取一个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// java.util.concurrent.ThreadPoolExecutor#getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 线程是否会被回收的条件:1. 核心线程设置了超时时间 或 2. 线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

流程分析:

  1. 如果当前状态是 STOP、TIDYING、TERMINATED,或者 SHUTDOWN 且任务队列为空,释放所有工作线程
  2. 如果任务队列是空的,且 workerCount 大于核心线程数,释放一个工作线程
  3. 从任务队列中取一个任务

如果线程没有设置存活时间,从任务队列中取任务是阻塞的,会一直等待有新的任务才返回;如果设置了存活时间,阻塞到 keepAlive 时返回 null

疑问:当线程等待任务超,且 workerCount > 1,会使用 compareAndDecrementWorkerCount 减少 ctl 的值(workerCount),并不会释放工作线程,只减少 ctl 有用吗

processWorkerExit

清理工作线程

输入:1. 工作线程;2. 任务是否意外完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// java.util.concurrent.ThreadPoolExecutor#processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

首先看下 completedAbruptly 的含义:

completedAbruptly = false,getTask 返回 null,任务未执行,workerCount 已经减 1

completedAbruptly = true,任务已执行,执行过程中发生异常导致

流程分析:

  1. 如果任务已执行,但中途发生异常,workerCount = workerCount - 1
  2. 更新已完成任务数(completedTaskCount 全局变量,加锁)
  3. 从工作队列中移除当前工作线程(workers 全局变量,加锁)
  4. 尝试结束线程池
  5. 判断是否要新增线程:如果线程池可以执行任务 & 任务未执行 & 线程数小于最小线程数,创建一个非核心线程

processWorkerExit() 执行完后,该工作线程的生命周期已经完结

线程的生命周期如下图:

线程状态变化.drawio

疑问:为什么清理工作线程时,要尝试结束线程池呢?

答:这是当线程池在收到 shutdown 命令时,进入 SHUTDOWN 状态,会拒绝新任务的提交,并继续执行任务队列中的任务,每执行完一个任务,判断下任务队列是否被清空,如果清空,则可以从 SHUTDOWN 状态进入 TIDYING 状态

下面看下 tryTerminate 方法

tryTerminate

尝试结束线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// java.util.concurrent.ThreadPoolExecutor#tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 不结束的3种情况
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// state = SHUTDOWN,任务队列为空,有工作线程
if (workerCountOf(c) != 0) { // Eligible to terminate
// 为什么只中断一个线程呢?为什么不全部中断呢?
interruptIdleWorkers(ONLY_ONE);
return;
}
// state = SHUTDOWN,任务队列为空,无工作线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将 state 改为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 将状态改为 TERMINATED
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

流程分析:

  1. 线程池在运行中,不结束
  2. 线程池在 TIDYING、TERMINATED 状态,不需要结束
  3. 线程池在 SHUTDOWN 状态,且 任务队列不为空,不结束
  4. 如果可以结束,且工作线程队列不为空,中断一个空闲的工作线程
  5. 如果工作线程队列为空,将线程池状态改为 TIDYING,执行 terminated 方法,结束线程池

疑问:为什么只中断一个空闲的工作线程,这时不是已经没有任务了吗

这里只清理一个线程,是因为其他线程可能在执行任务,等他们执行完,自然会再清理一个线程;如果每个线程执行完,都清理所有空闲线程,显然会带来大量的并发冲突

interruptIdleWorkers

中断一个空闲的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers(boolean)
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

流程:遍历工作线程集合,如果线程没有中断,且没有执行任务(可获得锁),中断该线程

shutdown

关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 关闭权限校验
checkShutdownAccess();
// 设置 SHUTDOWN 状态
advanceRunState(SHUTDOWN);
// 中断所有空闲的工作线程
interruptIdleWorkers();
// 钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

整体流程

下面用一个图将核心方法串联起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建一个固定大小的线程池,包含 4 个线程
ExecutorService executorService = ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // time unit
new LinkedBlockingQueue<Runnable>(), // workQueue
new ThreadPoolExecutor.AbortPolicy() // handler
);
// 提交 10 个任务到线程池
for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
try {
// 执行任务
// ...
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

// 关闭线程池
executorService.shutdown();
整体流程

总结

本文介绍了 jvm 线程池原理,对核心类和核心方法依次分析,对线程池整体执行流程、执行原理进行介绍,对于代码中的疑问,待后续解答

Reference

Java 线程池源码解析