本文介绍 jdk 线程池原理
 
 
主流程 
 
线程池主流程 
 
状态控制 
线程池的状态使用一个原子操作的变量 ctl 控制
1 2 private  final  AtomicInteger  ctl  =  new  AtomicInteger (ctlOf(RUNNING, 0 ));
 
ctl 包含了两个参数:
runState:线程池状态 
workerCount:工作的线程数 
 
ctl 的结构:32 位整型,低 29 位存 workerCount,高 3 位存
runState,例如:
RUNNING:111-xxx
SHUTDOWN:000-xxx
STOP:001-xxx
TIDYING:010-xxx
TERMINATED:011-xxx
可以看到:RUNNING < SHUTDOWN < STOP < TIDYING <
TERMINATED,明确状态之间的关系非常重要
如果用两个变量分别表示 A 和
B,在维护两者关系时,需要加锁保证两者的一致性
举个例子,在一个时刻先读 A 再读 B,代码中如果不加锁,可能读 A
的时候,B 的值被修改了,那么 A 和 B 的状态就不一致了;
为了避免对 A 和 B 加锁,使用一个原子变量代表 A 和 B,可以保障每次读 A
和 B 的值 状态是一致的
那么 A 和 B
哪个放在高位呢?应该将表示状态、分类的放高位,这样方便用比较大小来判断状态,不需要取出高位
 
生命周期 
 
线程池状态 
 
RUNNING:运行状态,线程池可以接收新的任务,可以处理已添加到任务队列中的任务
SHUTDOWN:关闭状态,当调用 shutdown()
方法后,线程池进入此状态。此时,线程池不再接收新任务,已存在任务队列中的任务继续执行直到他们全部完成
STOP:停止状态,当调用 shutdownNow()
方法后,线程池进入此状态。此时,线程池不再接收新任务,并尽力中断正在执行的任务,清空任务队列
TIDYING:整理状态,当所有任务都已结束,并且所有 worker 线程(除了
finalizer 线程)都已结束,线程池从 SHUTDOWN 或 STOP
状态转换到此状态,并执行 terminated() 方法进行清理工作
TERMINATED:在 terminated()
方法执行完,线程池进入此状态。此时,线程池终止,所有资源被释放,生命周期结束
结合上面状态之间的关系,可以用下面表达式来判断线程池一些行为:
state >= SHUTDOWN:线程池不再接收新任务 
state >= STOP:线程池不再执行任务,任务队列为空 
state < STOP:线程池可以执行任务 
 
核心类 
ThreadPoolExecutor 
管理线程池的核心类
重要概念 
线程池:为了复用线程来执行多个任务,以减少线程创建和销毁的开销,提高性能 
任务队列:用于存放待执行的任务,当有线程空闲时,会从队列中取出任务进行执行 
 
主要属性 
corePoolSize: 核心线程数 
maximumPoolSize:最大线程数,线程池容量 
keepAliveTime:线程空闲等待时间 
workQueue:等待队列/任务队列 
works:工作线程集合 
threadFactory:线程工厂,用于创建线程(包括核心线程和非核心线程) 
handler:拒绝策略 
 
构造方法:
1 2 3 4 5 6 7 8 ThreadPoolExecutor  executor  =  new  ThreadPoolExecutor (        5 ,                     10 ,                    60L ,                   TimeUnit.SECONDS,          new  LinkedBlockingQueue <Runnable>(),          new  ThreadPoolExecutor .AbortPolicy()  ); 
 
关键方法在后面分析
Worker 
工作线程,负责从任务队列中获取任务并执行,其设计目标是高效地管理任务调度和执行
本文代码基于 jdk 19.0.1
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private  final  class  Worker         extends  AbstractQueuedSynchronizer          implements  Runnable      {                  private  static  final  long  serialVersionUID  =  6138294804551838833L ;                  final  Thread thread;                  Runnable firstTask;                  volatile  long  completedTasks;                  Worker(Runnable firstTask) {             setState(-1 );              this .firstTask = firstTask;             this .thread = getThreadFactory().newThread(this );         }                  public  void  run ()  {             runWorker(this );         }    } 
 
类分析:
首先 Worker一个 Runnable 类,说明 Worker 可以被一个线程执行
Worker 又是一个 AbstractQueuedSynchronizer(AQS)类
AQS
是一个并发编程的框架,定义了一些模版方法,子类实现这些方法来指定锁行为,来实现独占锁或共享锁;比如
jdk 提供的实现类,如独占锁 ReentrantLock,共享锁
CountDownLatch、Semaphore
 
为什么 Worker 是一个 AQS 呢?可以看下 Worker 的抢锁方式:
1 2 3 4 5 6 7 protected  boolean  tryAcquire (int  unused)  {    if  (compareAndSetState(0 , 1 )) {         setExclusiveOwnerThread(Thread.currentThread());         return  true ;     }     return  false ; } 
 
可以看出,Worker 是一个独占 & 不可重入锁
独占是为了只有一个线程才能来执行 Worker
的任务,在执行过程中,其他线程不能来重复执行任务;不可重入即一个线程不能重复执行任务
构造方法:
1 2 3 4 5 Worker(Runnable firstTask) {     setState(-1 );      this .firstTask = firstTask;     this .thread = getThreadFactory().newThread(this ); } 
 
核心类之间的关系 
 
worker.drawio 
 
核心方法 
submit 
向线程池提交一个任务
1 2 3 4 5 6 public  Future<?> submit(Runnable task) {    if  (task == null ) throw  new  NullPointerException ();     RunnableFuture<Void> ftask = newTaskFor(task, null );     execute(ftask);     return  ftask; } 
 
执行的事情:
封装任务对象 
调用 execute 执行任务 
 
execute 
执行一个任务
要解决的事情:
怎么创建线程:核心 or 非核心 
什么情况要放入等待队列 
什么情况要拒绝任务 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  void  execute (Runnable command)  {        if  (command == null )             throw  new  NullPointerException ();         int  c  =  ctl.get();         if  (workerCountOf(c) < corePoolSize) {             if  (addWorker(command, true ))                 return ;             c = ctl.get();         }         if  (isRunning(c) && workQueue.offer(command)) {             int  recheck  =  ctl.get();             if  (! isRunning(recheck) && remove(command))                 reject(command);             else  if  (workerCountOf(recheck) == 0 )                 addWorker(null , false );         }         else  if  (!addWorker(command, false ))             reject(command);     } 
 
流程分析:
如果工作线程数(workerCount)小于
corePoolSize,直接创建核心线程执行任务 
如果 workerCount 大于等于 corePoolSize,向任务队列放入任务 
二次检查线程池运行状态,如果 workerCount 等于
0,则创建一个非核心工作线程 
如果向任务队列投放任务失败(任务队列已经满了),尝试创建非核心线程执行任务 
如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务 
 
 
线程池流程.drawio 
 
addWorker 
功能:创建一个工作线程
输入:任务,是否创建核心线程
输出:true 创建工作线程成功
代码走读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 private  boolean  addWorker (Runnable firstTask, boolean  core)  {        retry:         for  (int  c  =  ctl.get();;) {                        	             if  (runStateAtLeast(c, SHUTDOWN)                 && (runStateAtLeast(c, STOP)                     || firstTask != null                      || workQueue.isEmpty()))                 return  false ;             for  (;;) {                 if  (workerCountOf(c)                     >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))                     return  false ;               	                 if  (compareAndIncrementWorkerCount(c))                     break  retry;                 c = ctl.get();                   if  (runStateAtLeast(c, SHUTDOWN))                     continue  retry;                              }         }         boolean  workerStarted  =  false ;         boolean  workerAdded  =  false ;         Worker  w  =  null ;         try  {             w = new  Worker (firstTask);             final  Thread  t  =  w.thread;             if  (t != null ) {               	                 final  ReentrantLock  mainLock  =  this .mainLock;                 mainLock.lock();                 try  {                                                                                    int  c  =  ctl.get();                     if  (isRunning(c) ||                         (runStateLessThan(c, STOP) && firstTask == null )) {                         if  (t.getState() != Thread.State.NEW)                             throw  new  IllegalThreadStateException ();                       	                         workers.add(w);                         workerAdded = true ;                         int  s  =  workers.size();                       	                         if  (s > largestPoolSize)                             largestPoolSize = s;                     }                 } finally  {                     mainLock.unlock();                 }               	                 if  (workerAdded) {                     t.start();                     workerStarted = true ;                 }             }         } finally  {           	             if  (! workerStarted)                 addWorkerFailed(w);         }         return  workerStarted;     } 
 
上面对于边界判断比较复杂,这里先分析主要流程:
通过 CAS 的方式,尝试申请一个位置,分配给要创建的工作线程 
创建工作线程 
加锁,将工作线程加入到线程集合中,更新最大峰值,释放锁 
启动线程,执行任务 
启动失败,将现场从集合中移除,释放位置 
 
疑问:这里创建的线程怎么没有区分是核心线程还是非核心线程,核心线程不是要多停留一段时间再释放吗?
答:不需要区分一个线程是核心的还是非核心的,只要控制线程数量就可以了,即:如果大于
corePoolSize,则直接释放,直到等于 corePoolSize
 
runWorker 
在上面的 addWorker 方法中,会运行 thead.start()
启动线程,执行任务
我们知道,thread.start() 方法会执行传入 Runnable 对象的 run()
方法
1 2 3 4 5 6 7 8 Runnable  worker  =  new  Runnable () {    @Override      public  void  run ()  {         System.out.println("Hello!" );     } }; Thread  thread  =  new  Thread (worker);thread.start(); 
 
Worker 是一个 Runnable 类,在 Worker 构造时,将自己传入 Thread 中
1 2 3 4 5 Worker(Runnable firstTask) {     setState(-1 );      this .firstTask = firstTask;     this .thread = getThreadFactory().newThread(this ); } 
 
在执行任务时,从 Worker 对象 w 中,获取 thread t,运行 t.start()
时,会执行 w.run() 方法
1 2 3 4 public  void  run ()  {    runWorker(this ); } 
 
进而运行 runWorker 方法,执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 final  void  runWorker (Worker w)  {        Thread  wt  =  Thread.currentThread();   			         Runnable  task  =  w.firstTask;         w.firstTask = null ;         w.unlock();          boolean  completedAbruptly  =  true ;         try  {           	             while  (task != null  || (task = getTask()) != null ) {                 w.lock();                                                                                     if  ((runStateAtLeast(ctl.get(), STOP) ||                      (Thread.interrupted() &&                       runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                     wt.interrupt();                 try  {                     beforeExecute(wt, task);                     Throwable  thrown  =  null ;                     try  {                         task.run();                     } catch  (RuntimeException x) {                         thrown = x; throw  x;                     } catch  (Error x) {                         thrown = x; throw  x;                     } catch  (Throwable x) {                         thrown = x; throw  new  Error (x);                     } finally  {                         afterExecute(task, thrown);                     }                 } finally  {                     task = null ;                     w.completedTasks++;                     w.unlock();                 }             }             completedAbruptly = false ;         } finally  {             processWorkerExit(w, completedAbruptly);         }     } 
 
过程分析:
如果 task 不为空,直接执行,执行完后将 task 设为空 
如果 task 为空,循环不断地通过 getTask 从任务队列中获取任务 
一次任务执行过程:加锁,调用 task.run 执行任务,释放锁 
如果规定时间内无法从任务队列中获取任务,将 worker
从线程池中移除,结束工作线程 
执行超时,也移除 worker 
 
getTask 
从阻塞队列中取一个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private  Runnable getTask ()  {        boolean  timedOut  =  false ;          for  (;;) {             int  c  =  ctl.get();             int  rs  =  runStateOf(c);                          if  (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                 decrementWorkerCount();                 return  null ;             }             int  wc  =  workerCountOf(c);                        	             boolean  timed  =  allowCoreThreadTimeOut || wc > corePoolSize; 						             if  ((wc > maximumPoolSize || (timed && timedOut))                 && (wc > 1  || workQueue.isEmpty())) {                 if  (compareAndDecrementWorkerCount(c))                     return  null ;                 continue ;             }             try  {                 Runnable  r  =  timed ?                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                     workQueue.take();                 if  (r != null )                     return  r;                 timedOut = true ;             } catch  (InterruptedException retry) {                 timedOut = false ;             }         }     } 
 
流程分析:
如果当前状态是 STOP、TIDYING、TERMINATED,或者 SHUTDOWN
且任务队列为空,释放所有工作线程 
如果任务队列是空的,且 workerCount
大于核心线程数,释放一个工作线程 
从任务队列中取一个任务 
 
如果线程没有设置存活时间,从任务队列中取任务是阻塞的,会一直等待有新的任务才返回;如果设置了存活时间,阻塞到
keepAlive 时返回 null
疑问:当线程等待任务超,且 workerCount > 1,会使用
compareAndDecrementWorkerCount 减少 ctl
的值(workerCount),并不会释放工作线程,只减少 ctl 有用吗
 
processWorkerExit 
清理工作线程
输入:1. 工作线程;2. 任务是否意外完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private  void  processWorkerExit (Worker w, boolean  completedAbruptly)  {        if  (completedAbruptly)              decrementWorkerCount();         final  ReentrantLock  mainLock  =  this .mainLock;         mainLock.lock();         try  {             completedTaskCount += w.completedTasks;             workers.remove(w);         } finally  {             mainLock.unlock();         }         tryTerminate();         int  c  =  ctl.get();         if  (runStateLessThan(c, STOP)) {             if  (!completedAbruptly) {                 int  min  =  allowCoreThreadTimeOut ? 0  : corePoolSize;                 if  (min == 0  && ! workQueue.isEmpty())                     min = 1 ;                 if  (workerCountOf(c) >= min)                     return ;              }             addWorker(null , false );         }     } 
 
首先看下 completedAbruptly 的含义:
completedAbruptly = false,getTask 返回 null,任务未执行,workerCount
已经减 1
completedAbruptly = true,任务已执行,执行过程中发生异常导致
流程分析:
如果任务已执行,但中途发生异常,workerCount = workerCount - 1 
更新已完成任务数(completedTaskCount 全局变量,加锁) 
从工作队列中移除当前工作线程(workers 全局变量,加锁) 
尝试结束线程池 
判断是否要新增线程:如果线程池可以执行任务 & 任务未执行 &
线程数小于最小线程数,创建一个非核心线程 
 
processWorkerExit() 执行完后,该工作线程的生命周期已经完结
线程的生命周期如下图:
 
线程状态变化.drawio 
 
疑问:为什么清理工作线程时,要尝试结束线程池呢?
答:这是当线程池在收到 shutdown 命令时,进入 SHUTDOWN
状态,会拒绝新任务的提交,并继续执行任务队列中的任务,每执行完一个任务,判断下任务队列是否被清空,如果清空,则可以从
SHUTDOWN 状态进入 TIDYING 状态
下面看下 tryTerminate 方法
 
tryTerminate 
尝试结束线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 final  void  tryTerminate ()  {        for  (;;) {             int  c  =  ctl.get();           	             if  (isRunning(c) ||                 runStateAtLeast(c, TIDYING) ||                 (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))                 return ;           	             if  (workerCountOf(c) != 0 ) {                	                 interruptIdleWorkers(ONLY_ONE);                 return ;             } 						             final  ReentrantLock  mainLock  =  this .mainLock;             mainLock.lock();             try  {               	                 if  (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) {                     try  {                       	                         terminated();                     } finally  {                         ctl.set(ctlOf(TERMINATED, 0 ));                         termination.signalAll();                     }                     return ;                 }             } finally  {                 mainLock.unlock();             }                      }     } 
 
流程分析:
线程池在运行中,不结束 
线程池在 TIDYING、TERMINATED 状态,不需要结束 
线程池在 SHUTDOWN 状态,且 任务队列不为空,不结束 
如果可以结束,且工作线程队列不为空,中断一个空闲的工作线程 
如果工作线程队列为空,将线程池状态改为 TIDYING,执行 terminated
方法,结束线程池 
 
疑问:为什么只中断一个空闲的工作线程,这时不是已经没有任务了吗
这里只清理一个线程,是因为其他线程可能在执行任务,等他们执行完,自然会再清理一个线程;如果每个线程执行完,都清理所有空闲线程,显然会带来大量的并发冲突
 
interruptIdleWorkers 
中断一个空闲的进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private  void  interruptIdleWorkers (boolean  onlyOne)  {        final  ReentrantLock  mainLock  =  this .mainLock;         mainLock.lock();         try  {             for  (Worker w : workers) {                 Thread  t  =  w.thread;                 if  (!t.isInterrupted() && w.tryLock()) {                     try  {                         t.interrupt();                     } catch  (SecurityException ignore) {                     } finally  {                         w.unlock();                     }                 }                 if  (onlyOne)                     break ;             }         } finally  {             mainLock.unlock();         }     } 
 
流程:遍历工作线程集合,如果线程没有中断,且没有执行任务(可获得锁),中断该线程
shutdown 
关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public  void  shutdown ()  {        final  ReentrantLock  mainLock  =  this .mainLock;         mainLock.lock();         try  {                          checkShutdownAccess();           	             advanceRunState(SHUTDOWN);           	             interruptIdleWorkers();           	             onShutdown();          } finally  {             mainLock.unlock();         }         tryTerminate();     } 
 
整体流程 
下面用一个图将核心方法串联起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ExecutorService  executorService  =  ThreadPoolExecutor  executor  =  new  ThreadPoolExecutor (        5 ,                     10 ,                    60L ,                   TimeUnit.SECONDS,          new  LinkedBlockingQueue <Runnable>(),          new  ThreadPoolExecutor .AbortPolicy()  ); for  (int  i  =  0 ; i < 10 ; i++) {   final  int  taskId  =  i;    executorService.submit(() -> {        try  {                       		         } catch  (InterruptedException e) {             Thread.currentThread().interrupt();         }     }); } executorService.shutdown(); 
 
 
整体流程 
 
总结 
本文介绍了 jvm
线程池原理,对核心类和核心方法依次分析,对线程池整体执行流程、执行原理进行介绍,对于代码中的疑问,待后续解答
Reference 
Java
线程池源码解析