02、ThreadPoolExecutor 线程池源码完整剖析 -----> 五种线程池状态、execute()、addWorker()、runWorker()和getTask() 方法解析
目录
ThreadPoolExecutor 线程池源码完整剖析
3、ThreadPoolExecutor 线程池源码剖析
还是以这个代码来展开

线程池 ThreadPoolExecutor 源码
线程池的五种状态解析:
先分析线程池的五种状态的特点:
源码:

分析图:

RUNNING(运行状态)
运行状态,该状态下线程池可以接受新的任务,也可以处理阻塞队列中的任务
执行 shutdown 方法可进入 SHUTDOWN 状态
执行 shutdownNow 方法可进入 STOP 状态
SHUTDOWN(待关闭状态)
待关闭状态,不再接受新的任务,继续处理阻塞队列中的任务
当阻塞队列中的任务为空,并且工作线程数为0时,进入 TIDYING 状态
STOP(停止状态)
停止状态,不接收新任务,也不处理阻塞队列中的任务,并且会尝试结束执行中的任务
当工作线程数为0时,进入 TIDYING 状态
TIDYING(整理状态)
整理状态,此时任务都已经执行完毕,并且也没有工作线程
执行 terminated 方法后进入 TERMINATED 状态
TERMINATED(终止状态)
终止状态,此时线程池完全终止了,并完成了所有资源的释放
execute() 执行任务的方法 源码分析
execute() 方法是什么?
execute() 方法 是 线程池类ThreadPoolExecutor 中的一个方法。
通过 线程池对象 调用这个 execute() 方法,然后传入一个参数Runnable对象,Runnable对象就是我们让线程要执行的任务。
execute() 方法里面会进行一些判断,用于执行添加核心线程或非核心线程、以及抛异常或者触发任务拒绝策略等操作。
点进 execute() 方法,看代码的执行逻辑

详细分析execute() 方法源码思路:


下面的就是对这个execute() 方法进行展开分析:
线程池五种状态对应的二进制分析:
源码分析
如图:
解释下 ctl 这个变量:ctl 存储了线程池的状态和目前正在工作的线程数量
(这个 AtomicInteger 类保证了这个线程操作数值的安全)
ctl 这一个变量存了多个参数,肯定是使用了二进制的方式来表示。

ctl 这一个变量存了多个参数(线程池的状态和目前正在工作的线程数量 ),肯定是使用了二进制的方式存储。
debug演示
还是上面的代码,启动后,获取ctl的值,用来分析 线程池的状态和工作线程的数量

可以看到此时 ctl 获取到的值是 -536870912
把这个值转成二进制来看:

( 因为 变量ctl 是int类型,占4个字节,就是32位。就看到32位就好)
1110 0000 0000 0000 0000 0000 0000 0000
从这个二进制值可以看出这个线程池处于正在运行的状态
二进制分析
解释为什么能看出线程池是什么状态:
如图:源码中设置的5种状态的对应的值。
-1 << COUNT_BITS 的意思对 -1 进行算术左移操作,移动的位数是COUNT_BITS(29位)

查下 -1 左移29位后的二进制数

线程池5种状态对应的二进制数:
注意:
因为有5种状态,所以需要用到3个bit。
为什么用到3bit?
如下图:五种状态在源码规定的往左移29位之后,是这样的:
RUNNING --> 111
SHUTDOWN --> 000
STOP --> 001
TIDYING --> 010
TERMINATED --> 011
所以五种状态,用3个bit来组合刚好够用。
1字节(byte) = 8 比特(bit,也可以说 位)
源码的设计是这样的,ctl 的长度是32位,前3位表示线程池的状态,后面的29位,表示正在工作的线程的数量。


所以回到上面的问题解答:

-536870912 转二进制:1110 0000 0000 0000 0000 0000 0000 0000
前3位是111,后面29位全是0。
跟线程池处于Running状态的二进制数一样
表示此时代码中的线程池处于运行的状态,然后工作线程的数量是0个。
====================================================
根据判断添加核心线程源码分析:
继续分析execute()方法的源码
这段代码的逻辑,就是上面说过的,通过ctl.get()方法拿到当前线程池的运行状态和工作线程数后,
if (workerCountOf© < corePoolSize) 这段代码用来判断当前线程池的工作线程数量是否小于核心线程数,如果小于核心线程数,那么就添加一个核心线程。

代码解释:
调用 workerCountOf© 方法,这段代码就是拿到c的值,这个值是二进制数,取出它二进制数后面29位,判断有没有工作线程。
c 的值是 -536870912 转二进制:1110 0000 0000 0000 0000 0000 0000 0000
COUNT_MASK = (1 << COUNT_BITS) - 1 : 1 转二进制数后,往左移29位再减1,得出的值是 536870911 ,如下图得出值
COUNT_MASK 的值是 536870911 ,转二进制是 0001 1111 1111 1111 1111 1111 1111 1111
然后 workerCountOf© 方法是要 return c & COUNT_MASK;
就是要拿 c 的二进制值 和 COUNT_MASK 的二进制值进行 & 运算,就是 按位与运算。

根据按位与运算的规则,对应位上的两个数如果都是1,结果就是1,否则是0。如图:

所以 workerCountOf(int c) { return c & COUNT_MASK; } 最后返回的是 0,因为0小于我们设置的核心线程数2,所以会添加一个核心线程
这个 true 表示是添加核心线程。
总结:这个if判断的代码就是用来添加核心线程的。

继续往下判断

这个 isRunning© 条件判断,判断这个 c (就是当前的线程池)状态是否 < SHUTDOWN ,
因为在线程池的五种状态中,SHUTDOWN = 0,比它小的之后 RUNNING ( -1 ) ,
所以主要用来判断当前线程池是否处于 RUNNING运行状态
workQueue.offer(command) 这个条件判断,是往队列中添加任务(如果添加成功,返回true,否则返回false),返回false表示往队列中添加任务失败,就是表示线程池的阻塞队列已经满了

addWorker() 添加工作方法 源码分析
addWorker() 里面的双层for循环分析:
源码:
先分析这一部分源码:

源码解析:


截图详细解析
retry 标签的用法:
定义在最外层的for循环外面,break 可以根据这个标签的位置跳出最外层的for循环,这是一种写法。

for (; ; ){ }
这种没有条件的for循环,相当于 while(true),一样,一直循环,是死循环。

外循环的判断条件分析:
这个判断:判断当前线程池的运行状态
这个判断就是:当前线程池如果不是 Running 运行状态,那么就返回false,直接结束循环。
就没必要往下执行添加线程的操作了

这个判断:判断是否能添加核心线程或非核心线程
按位与 (&) 算法:
对于两个整数的每一位,如果两个相应位同时为 1,则结果的相应位为 1;否则为 0。
再解析这部分判断:
在调用addWorker() 方法的时候,会传一个布尔值 core 参数,
如果core为true,表示调用这个方法是要创建核心线程;
如果core为false,表示调用这个方法是要创建非核心线程;
然后如图:这个if条件判断:
是拿当前线程池的工作线程数去进行判断,
如果调用addWorker() 方法是要创建核心线程,而当前工作线程数大于核心线程数的话,则直接return结束循环,返回false; 不进行添加核心线程的操作。
如果是要创建非核心线程,而当前工作线程数大于最大线程数的话,则直接return结束循环,返回false; 不进行添加非核心线程的操作。

上面进行判断后,到这里就可以添加线程了。
调用这个compareAndIncrementWorkerCount© 方法,通过 CAS 操作,给 ctl 值进行 +1 的操作,就是在 ctl 的二进制数值的表示工作线程数的后29位那里进行+1操作。
通过 CAS 操作,能够在不断的循环中保证线程是安全的
添加线程数成功,则跳出最外层for循环,
添加失败的话,则继续往下判断。
注意:这里只是给线程池的工作线程数 +1 ,但是还没有真正创建该线程。

上面添加线程数成功,则跳出最外层for循环,
如果添加线程数失败的话,
则重新获取 ctl 的值,判断我们这个线程在执行这个内部for循环期间,有没有其他线程修改了这个线程池的状态,如果被修改成非 Running 运行状态,则直接跳过本次的双层for循环。
从头再开始判断。

部分总结
判断当前线程池的状态,如果是Running运行状态,则继续往下走;
如果当前线程池的工作线程数小于核心线程数,则可以添加核心线程;
如果当前线程池的工作线程数小于最大线程数,则可以添加非核心线程;达成这条件,继续往下走;
通过 CAS 操作,对当前线程池的工作线程数量进行 +1 操作,如果工作线程+1成功,则跳出这个双层for循环,继续往下走;
如果添加工作线程失败,再分两种情况:
如果当前线程池的状态依然是 Running 运行状态,则只需要一直重复内层for循环,直到 +1 成功,才 break 跳出双层for循环,继续走下去;
如果当前线程池的状态不是 Running 运行状态,被其他线程修改成其他状态了,则 continue 跳过这次双重for循环,重新重头开始双层for循环,直到 +1 成功,才 break 跳出双层for循环,继续走下去;

addWorker() 方法创建新工作线程源码部分
源码部分:


源码解释:



创建工作线程对象分析图:
线程start方法和run方法的关系:
start 方法只是用于启动一个新线程;
start 方法启动了一个新线程执行 run 方法中的任务代码,而直接调用 run 方法只是在当前线程中执行任务代码,没有新线程的效果。
run 方法包含了线程的实际任务代码,在新线程启动后会被自动调用来执行任务
总的来说:
线程池对象executor,调用execute()方法,execute()方法需要一个Runnable 类型的参数,这个参数对象里面有一个run方法,run方法里面封装着一些业务逻辑。
当我们把要执行的业务逻辑封装在Runnable 对象的run方法里面,作为参数传递给execute()方法,线程池就会根据自己的判断,或重用或创建线程,来执行这个Runnable 对象里面的run方法。
这个分析图是为了说明,addWorker方法里面最后新创建的工作线程,在调用start()方法启动线程后,执行的run方法,就是一开始线程池对象调用execute方法时传进去的Runnable对象里面的 run() 方法。(executor.execute(new Runnable(){})
就是执行封装着打印功能的run方法。



判断当前线程状态
如果当前线程池状态是 Running 运行状态,或者是SHUTDOWN待关闭状态且传进来的任务是空的,才能继续往下执行

判断线程是否已经运行起来了:
就是判断此时的线程是否已经被start 启动过了,如果在其他地方被调用start方法启动了,那么这里后续就不能再调用线程的start方法了来执行其他操作了,所以抛个异常

这个workers是一个HashSet集合。

addWorker 完整源码解释截图





上面已经分析了工作线程的创建和启动。
接下来就是分析工作线程执行封装着业务逻辑的Runnable对象中的run方法的逻辑。
runWorker() 执行run方法的逻辑分析
再回顾下这个代码逻辑

根据上面的分析,t.start() 工作线程启动后,就是执行Runnable对象中的run方法,就是执行这个 Worker 对象的run方法,
如图可以看出,worker对象的run方法传进一个this,这个this 就是Worker 对象本身;
而Worker 对象本身又存着 Runnable firstTask;这个成员变量;
而Runnable firstTask;这个成员变量就是一开始executor.execute(new Runnable(){}传进来的那个Runnable对象,而这个对象的run方法就封装着我们写的打印功能。
简单来说,绕来绕去,最终执行的就是执行 封装着我们写的打印功能 的那个run方法。
上面解释的执行这个 Worker 对象的run方法,那么肯定就调用到方法体里面的这个runWorker()方法,接下来就是对这个runWorker方法进行解释:
这个runWorker()方法具体就是在写工作线程的工作执行逻辑
一系列判断后,走到这个 task.run() ;这里,调用到了我们封装业务逻辑的run方法

runWorker() 源码:


源码解释


getTask() 方法
getTask() 方法: 从阻塞队列里面取任务的方法
源码:

源码解释:


截图详细分析
如图:演示下获取该对象的变量task后,再将该变量设置为null,那刚刚获取到的task的引用还有值吗?
答案是有的。

根据上面代码提问:一个demo对象有一个user类成员变量,然后用task接收 demo对象获取user这个成员变量,然后再把demo.user=null 设置为null,那此时的task还有值吗,还是也变成null,
解释:
堆中有 demo 和 user 两个对象,demo.user 变量存储的不是User对象本身,而是指向User对象的引用。
执行 demo.user = null; 时,只是断开了 demo 对象中对 user 成员变量的引用关系,并将其设置为 null,
但是实际的 User 对象本身依然存在,并不会因为 demo.user 的变化而被销毁
因为此时的task 还是引用着User对象,所以task还有值。
如图:解释这部分代码
这个代码没什么需要注意的,正常理解即可

钩子方法:
是一种重要的设计模式,它通过在父类中留出可重写的空方法,为子类提供了定制算法行为的灵活性

task.run(); 调用解释
就是调用我们封装业务逻辑的run方法

while 循环判断:
第一次执行的就是传进来的任务,后面主要就是看线程如何重复获取阻塞队列里面的任务来执行 。
getTask():从阻塞队列里面取任务的方法
这个while 的条件如果都是null,意味着这个线程的整个run方法就执行结束了,那么这个线程就进入销毁阶段了,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁。

主要来看这个条件判断 (task = getTask()) != null
表示从阻塞队列里面也没取到任务,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁
getTask() 阻塞队列里面取任务的方法
先看这部分,如果getTask()方法在阻塞队列中获取任务时,判断到当前线程池不是Running运行时状态,或者阻塞队列为空,那么直接返回null

单纯截图方便看对应的方法

假设当前线程是核心线程或非核心线程两种情况来看处理逻辑:
解释 allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
因为allowCoreThreadTimeOut默认是false,所以只需要针对 wc > corePoolSize 来假设两种情况:
假设的情况一::假设当前工作线程数是 3 , 核心线程数是 2 ; 那么 timed —> 此时就为true
假设的情况二::假设当前工作线程数是 2 , 核心线程数是 2 ; 那么 timed —> 此时就为false
出现的情况结果如图

4、一些演示:
debug看线程池的一些数值变化
这是我创建线程池的数值:
核心线程数为2;最大线程数为4;非核心线程存活时间 60s;缓存任务的阻塞队列,容量为10,每个线程执行时间睡眠100秒,方便分析
演示设置的参数是否如所想的一样执行,看是不是先创建2个核心线程,还有任务就放阻塞队列,阻塞队列满了就继续创建非核心线程,达到最大线程数后还有任务进来,则线程池执行拒绝策略。

演示开始:

演示设置的参数是否如所想的一样执行,看是不是先创建2个核心线程,还有任务就放阻塞队列,阻塞队列满了就继续创建非核心线程,达到最大线程数后还有任务进来,则线程池执行拒绝策略
如果,线程池是按这个顺序在执行

演示 allowCoreThreadTimeOut 变量
allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
标记为true,看核心线程会不会被销毁。

演示思路:

本来想演示看核心线程在非核心线程存活时间过去后,还有没有存在。但是这个for循环没执行完就跳到最后面这个打印功能,导致演示不出来。

直接说要演示的结果:
allowCoreThreadTimeOut 默认是false,如果设置为true,那么2个核心线程没任务的话也会被销毁

大概是这样:
核心线程是0

5、总结流程图:
整个ThreadPoolExecutor 线程池源码 大概就这个逻辑。

这个是几年前的 ThreadPoolExecutor 类思路图,拿来参考下;
此时的 ThreadPoolExecutor 类 的源代码也已经发生一些修改了,但是主要逻辑不变。

6、代码:
demo
package cn.ljh.algorithmic;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* author JH
*/
public class Demo09
{
public static void main(String[] args)
{
//创建一个线程池对象
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, //核心线程数为 2
4, //最大线程数为 4
5, //非核心线程存活时间
TimeUnit.SECONDS, //非核心线程存活时间单位:秒
new ArrayBlockingQueue<>(10), //缓存任务的阻塞队列,容量为10
new ThreadFactory() //创建线程
{
int count = 0;
@Override
public Thread newThread(Runnable r)
{
return new Thread(r, "自定义的线程名称:" + (++count));
}
},
new ThreadPoolExecutor.AbortPolicy() //默认拒绝策略
//new ThreadPoolExecutor.DiscardPolicy() //也是丢弃任务,但是不抛出异常
//new ThreadPoolExecutor.DiscardOldestPolicy() //丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)
//new ThreadPoolExecutor.CallerRunsPolicy() //由调用线程处理该任务,这里是main这个线程调用这个线程池
);
//allowCoreThreadTimeOut 默认是false,设置为true则核心线程能被销毁
// executor.allowCoreThreadTimeOut(true);
//线程池执行 i 个任务
for (int i = 0; i < 14; i++)
{
final int x = i;
// if (i == 10)
// {
// executor.shutdownNow();//关闭线程池:调用这个方法之后,后续添加的任务是不会再被执行了,但是已经加入的任务,如果还没有开始执行,就不会再去执行了。
// //executor.shutdown();//关闭线程池:调用这个方法之后,后续添加的任务是不会再被执行了,但是已经加入的任务会继续执行完
// }
//线程执行的任务
executor.execute(new Runnable()
{
@Override
public void run()
{
System.err.println("当前线程名称:【 "
+ Thread.currentThread().getName()
+ " 】" + " , 执行第【 " + x + " 】个任务");
//添加睡眠时间,增加这条线程的执行时间,那么线程池就会调用另一个线程来继续执行任务。
try
{
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
});
}
System.err.println("线程执行完");
}
}
ThreadPoolExecutor 类 源代码
package java.util.concurrent;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolExecutor extends AbstractExecutorService
{
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE 是 32位 ; 32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1 往左移动29位 ,再 -1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
//线程池的五种运行状态,状态以二进制的形式进行存储
private static final int RUNNING = -1 << COUNT_BITS; //运行状态
private static final int SHUTDOWN = 0 << COUNT_BITS; //待关闭状态
private static final int STOP = 1 << COUNT_BITS; //停止状态
private static final int TIDYING = 2 << COUNT_BITS; //整理状态
private static final int TERMINATED = 3 << COUNT_BITS; //终止状态
// Packing and unpacking ctl
private static int runStateOf(int c)
{
//~ 符号代表按位取反操作符,它会翻转操作数的每一位,即对操作数的每一位进行取反操作(0 变为 1,1 变为 0)
return c & ~COUNT_MASK;
}
private static int workerCountOf(int c)
{
//按位与 (&) 算法:对于两个整数的每一位,如果两个相应位同时为 1,则结果的相应位为 1;否则为 0。
return c & COUNT_MASK;
}
private static int ctlOf(int rs, int wc)
{
//| :按位或操作;按位或操作会将每个对应位的值进行逻辑或运算,即若两个位中任意一个位为 1,则结果对应位为 1,否则为 0
return rs | wc;
}
private static boolean runStateLessThan(int c, int s)
{
return c < s;
}
private static boolean runStateAtLeast(int c, int s)
{
return c >= s;
}
private static boolean isRunning(int c)
{
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect)
{
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect)
{
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount()
{
ctl.addAndGet(-1);
}
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<>();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
//Worker 对象实现类 Runnable 类 ,所以也是一个Runnable对象
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
//常量
private static final long serialVersionUID = 6138294804551838833L;
//属性
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
//构造器
Worker(Runnable firstTask)
{
setState(-1);
//把传进来的任务类(firstTask)设置到 Worker 对象的成员变量 firstTask 里面
this.firstTask = firstTask;
//调用线程工厂的 newThread 方法,创建一个线程,然后把这个线程设置到 Worker 对象的 thread 变量里面。
//this :在Worker构造器里面,就是指这个Worker 对象
this.thread = getThreadFactory().newThread(this);
}
public void run()
{
runWorker(this);
}
protected boolean isHeldExclusively()
{
return getState() != 0;
}
protected boolean tryAcquire(int unused)
{
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused)
{
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock()
{
acquire(1);
}
public boolean tryLock()
{
return tryAcquire(1);
}
public void unlock()
{
release(1);
}
public boolean isLocked()
{
return isHeldExclusively();
}
void interruptIfStarted()
{
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted())
{
try
{
t.interrupt();
} catch (SecurityException ignore)
{
}
}
}
}
private void advanceRunState(int targetState)
{
// assert targetState == SHUTDOWN || targetState == STOP;
for (; ; )
{
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
final void tryTerminate()
{
for (; ; )
{
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0)
{ // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
{
try
{
terminated();
} finally
{
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally
{
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void checkShutdownAccess()
{
// assert mainLock.isHeldByCurrentThread();
SecurityManager security = System.getSecurityManager();
if (security != null)
{
security.checkPermission(shutdownPerm);
for (Worker w : workers)
security.checkAccess(w.thread);
}
}
private void interruptWorkers()
{
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
private void interruptIdleWorkers(boolean onlyOne)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
for (Worker w : workers)
{
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock())
{
try
{
t.interrupt();
} catch (SecurityException ignore)
{
} finally
{
w.unlock();
}
}
if (onlyOne)
break;
}
} finally
{
mainLock.unlock();
}
}
private void interruptIdleWorkers()
{
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
final void reject(Runnable command)
{
handler.rejectedExecution(command, this);
}
void onShutdown()
{
}
private List<Runnable> drainQueue()
{
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
q.drainTo(taskList);
if (!q.isEmpty())
{
for (Runnable r : q.toArray(new Runnable[0]))
{
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
//添加线程的方法
private boolean addWorker(Runnable firstTask, boolean core)
{
//这是一个标签,可以根据标签的位置,帮助 break 跳出for循环,这里是帮助break跳出最外面的循环
retry:
//不断循环,每次循环都获取一次 ctl 的值。里面存着当前线程池的运行状态和工作线程数
for (int c = ctl.get(); ; )
{
//整体判断:判断当前线程池的状态是否 >= SHUTDOWN 且 >= STOP,或者 firstTask这个 Runnable对象不能为空,或者 workQueue 阻塞队列为空。
//简单的说:这个判断就是:当前线程池如果不是 Running 运行状态,那么就返回false,就没必要往下执行添加线程的操作了
//条件1:runStateAtLeast(c, SHUTDOWN):判断线程池当前的状态是否 >= SHUTDOWN 状态
//条件2:(runStateAtLeast(c, STOP):判断线程池当前的状态是否 >= STOP 状态
//条件3:firstTask != null:传进来的 Runnable 对象 不能为空
//条件4:workQueue.isEmpty():如果workQueue队列为空,则返回true
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
//根据判断,线程池都不是运行时状态,就没必要往下执行添加线程的操作了。直接 return false
return false;
//没有条件的for循环,相当于 while(true),是不断循环的
for (; ; )
{
//整体判断:通过传进来的 core 进行判断:
//core 为 true,表示要创建核心线程,则【判断当前线程池的工作线程数是否大于我们一开始设置的核心线程数,大于的话就不能再添加核心线程了】,返回return false,结束循环
//core 为 false,表示要创建非核心线程,则【判断当前线程池的工作线程数是否大于我们一开始设置的最大线程数,大于的话就不能再添加非核心线程了】,返回return false,结束循环
// & 位于运算 优先级 高于 >= 运算
//条件1:workerCountOf(c):当前线程池状态的二进制数,和 COUNT_MASK 变量去进行 & 按位与运算,用于获取:【当前的工作线程数量】
//条件2:(core ? corePoolSize : maximumPoolSize) ====> core == true ,表示核心线程数 ; core == false,表示最大线程数
//通过三元运算,拿核心线程数或最大线程数,和 COUNT_MASK 进行 & 按位与 运算。
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 通过 CAS 操作,给 ctl 值进行 +1 的操作,就是在 ctl 的二进制数值的表示线程数的后29位那里进行+1操作
if (compareAndIncrementWorkerCount(c))
//如果 +1 成功,则跳出外层for循环
//通过这个retry标签的位置,跳出最外面的循环
break retry;
//如果自增+1失败,重新获取 ctl 的值
c = ctl.get();
//如果重新获取的线程池运行状态,和 for 循环外获取的线程池状态不一样,
//(因为在这个线程的内层for循环期间,可能有其它线程调用了 SHUTDOWN 等方法,导致线程池状态已经被改变了。
// 所以需要再判断一次,重新走一遍外层for循环,用于重新判断线程池状态)
if (runStateAtLeast(c, SHUTDOWN))
//经判断,如果线程池不是Running运行状态,则跳出这双层for循环。
continue retry;
}
}
//判断是否开启工作线程
boolean workerStarted = false;
//判断线程是否成功添加
boolean workerAdded = false;
//定义工作线程的变量
Worker w = null;
try
{
//创建Worker工作线程对象,firstTask是 Runnable 类对象,作为任务参数被传进来;
//Worker 对象实现类 Runnable 类 ,所以也是一个Runnable对象
w = new Worker(firstTask);
//取出 Worker 对象中的 thread 属性
final Thread t = w.thread;
//判断 t 线程不为空(自定义线程工厂是有可能返回 null 的)
if (t != null)
{
//进行加锁操作 ,是可重入锁,就是这个线程可以多次获取这个锁的资源
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
//获取当前线程池的状态
int c = ctl.get();
//整体来说:如果当前线程池状态是 Running 运行状态,或者是SHUTDOWN待关闭状态且传进来的任务是空的,才能继续往下执行
//条件1:如果当前线程池的状态 < SHUTDOWN,当前线程池为 Running 运行状态,则为true
//条件2: 当前线程池的状态 < STOP 且传进来的 firstTask == null
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null))
{
//t.getState() 获取当前线程的状态
//Thread.State.NEW 是 Java 中线程的一种状态,表示线程已经被创建,但尚未启动
//判断线程池是否已经运行起来了,如果线程状态是已经被启动了,表示后续不能再调用线程的start方法了,所以抛个异常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//Set 集合,存储目前有哪些工作线程对象;用于后续销毁的时候知道有哪些工作线程
workers.add(w);
//标记线程成功添加
workerAdded = true;
//获取集合中工作线程的数量
int s = workers.size();
//largestPoolSize 标记线程池中使用到的最大线程数,用来统计而已。
if (s > largestPoolSize)
//如果此时的工作线程数量超过记载的使用到的最大线程数量,则更新
largestPoolSize = s;
}
} finally
{
//释放锁
mainLock.unlock();
}
//线程添加成功,workerAdded 为 true
if (workerAdded)
{
//启动这个刚添加的工作线程,就是启动 Worker对象 中的 thread 线程
t.start();
//标记新添加的工作线程已经被启动了
workerStarted = true;
}
}
} finally
{
if (!workerStarted)
//启动线程失败后的操作
addWorkerFailed(w);
}
//返回新添加的这个工作线程的启动状态
return workerStarted;
}
private void addWorkerFailed(Worker w)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally
{
mainLock.unlock();
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly)
{
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally
{
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
{
if (!completedAbruptly)
{
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
//从阻塞队列里面取任务的方法
private Runnable getTask()
{
//是否超时的标记
boolean timedOut = false;
//不断循环
for (; ; )
{
//获取当前线程池的状态和工作线程数
int c = ctl.get();
//判断线程池的状态(如果是非running状态为true) || 阻塞队列是否为空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty()))
{
//worker 工作线程数量 -1 操作
decrementWorkerCount();
//线程池非Running运行状态或者阻塞队列没有任务为空了,直接返回null
return null;
}
//获取当前线程池中的工作线程数量
int wc = workerCountOf(c);
//allowCoreThreadTimeOut : 是否允许核心线程超时,默认是false; 如果把这个标记成true,那么核心线程就可以被销毁掉
//wc > corePoolSize:当前线程池的工作线程数是否 > 核心线程数
//假设的情况一::假设当前工作线程数是 3 , 核心线程数是 2 ; 那么 timed ===> 此时就为true
//假设的情况二::假设当前工作线程数是 2 , 核心线程数是 2 ; 那么 timed ===> 此时就为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//当前线程池中的工作线程数量 > 最大线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
{
//通过原子性的操作,把工作线程数 -1 ,然后返回null
if (compareAndDecrementWorkerCount(c))
//返回null,跳出for循环
return null;
continue;
}
try
{
//三元运算符,假设的情况一:那么timed ===> true
//假设等待60s都没有获取到元素,那么这个 r == null
Runnable r = timed ?
//假设情况一判断:在设定的keepAliveTime(非核心线程存活时间)之内获取元素,如果能获取到就返回该元素对象,获取不到返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//假设情况二的判断:不会有超时时间,会一直阻塞等待,直到有任务为止,代码走到这里就会一直卡在这里,那么就能保证这个线程就不会被销毁
workQueue.take();
if (r != null)
return r;
//假设的情况一: ,r == null 的话,那么是否超时的标记设置为true
timedOut = true;
} catch (InterruptedException retry)
{
timedOut = false;
}
}
}
//工作线程的工作逻辑
final void runWorker(Worker w)
{
//获取当前执行的线程
Thread wt = Thread.currentThread();
//获取传入的Runnable任务对象
Runnable task = w.firstTask;
//将任务对象设置为空
w.firstTask = null;
//允许中断情况下的解锁
w.unlock();
boolean completedAbruptly = true;
try
{
//判断任务是否为空(一般情况下,第一次传进来的任务task不为空)
//getTask():从阻塞队列里面取任务的方法
//(task = getTask()) != null 表示从阻塞队列里面也没取到任务,如果在存活时间内没获取任务,那么该线程如果不是核心线程就会被销毁
while (task != null || (task = getTask()) != null)
{
//对当前这个任务线程加锁
w.lock();
//判断是否要中断当前线程
//ctl.get() 获取当前线程池的状态和工作线程数 ;Thread.interrupted()判断当前线程是否被中断,中断则返回true
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程
wt.interrupt();
try
{
//钩子方法:是一种重要的设计模式,它通过在父类中留出可重写的空方法,为子类提供了定制算法行为的灵活性
beforeExecute(wt, task);
try
{
//执行传入的任务类对象run方法
task.run();
afterExecute(task, null);
} catch (Throwable ex)
{
//捕捉异常
afterExecute(task, ex);
throw ex;
}
} finally
{
task = null;
w.completedTasks++;
//最后进行解锁操作
w.unlock();
}
}
completedAbruptly = false;
} finally
{
processWorkerExit(w, completedAbruptly);
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
{
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* executor.execute(new Runnable()------来到这个方法
* 线程池的execute()方法用于向线程池提交一个任务,该任务将由线程池中的线程来执行。
* 在execute()方法中,通常会传入一个Runnable对象,该对象表示要执行的任务逻辑。
* 这个方法就是通过一些判断来添加核心线程或非核心线程等操作。
*/
public void execute(Runnable command)
{
//判断任务是否为空,空的话直接抛异常
if (command == null)
throw new NullPointerException();
//获取到 ctl 的值,ctl 存储了线程池的状态和目前工作线程数
int c = ctl.get();
//意思: 判断如果 目前工作线程数 < 核心线程数,那么就添加一个核心线程
if (workerCountOf(c) < corePoolSize)
{
//增加一个核心线程,这个true表示是要添加核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果 工作线程数 > 核心线程数,则继续判断
//意思:判断如果当前线程池是RUNNING运行状态,且往阻塞队列里面添加任务成功,
//条件1 :判断当前线程池的运行状态,源码是判断 c < SHUTDOWN
//条件2:往队列中添加任务(如果添加成功,返回true,否则返回false)
if (isRunning(c) && workQueue.offer(command))
{
//重新获取ctl的值,作用是为了
int recheck = ctl.get();
//意思:这里有!取反,表示:如果当前线程不是运行状态,且移除刚刚添加到阻塞队列的任务的操作失败,就执行任务拒绝策略
//条件1:如果线程池不处于 RUNNING 运行状态
//条件2:把刚添加的任务移除掉,移除成功返回true,否则返回false
if (!isRunning(recheck) && remove(command))
//执行任务拒绝策略
reject(command);
//如果工作线程为0,继续添加工作线程
else if (workerCountOf(recheck) == 0)
// false 表示添加非核心线程
addWorker(null, false);
}
//任务满了,添加非核心线程,这个 false 表示是要添加非核心线程
//整个条件判断的返回值是表示是否添加成功
else if (!addWorker(command, false))
//添加非核心线程失败,执行拒绝策略
reject(command);
}
public void shutdown()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally
{
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow()
{
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally
{
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown()
{
return runStateAtLeast(ctl.get(), SHUTDOWN);
}
boolean isStopped()
{
return runStateAtLeast(ctl.get(), STOP);
}
public boolean isTerminating()
{
int c = ctl.get();
return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated()
{
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException
{
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
while (runStateLessThan(ctl.get(), TERMINATED))
{
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally
{
mainLock.unlock();
}
}
@Deprecated(since = "9")
protected void finalize()
{
}
public void setThreadFactory(ThreadFactory threadFactory)
{
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
public ThreadFactory getThreadFactory()
{
return threadFactory;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
{
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
public RejectedExecutionHandler getRejectedExecutionHandler()
{
return handler;
}
public void setCorePoolSize(int corePoolSize)
{
if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0)
{
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true))
{
if (workQueue.isEmpty())
break;
}
}
}
public int getCorePoolSize()
{
return corePoolSize;
}
public boolean prestartCoreThread()
{
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
void ensurePrestart()
{
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
public int prestartAllCoreThreads()
{
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
public boolean allowsCoreThreadTimeOut()
{
return allowCoreThreadTimeOut;
}
public void allowCoreThreadTimeOut(boolean value)
{
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut)
{
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
public void setMaximumPoolSize(int maximumPoolSize)
{
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public int getMaximumPoolSize()
{
return maximumPoolSize;
}
public void setKeepAliveTime(long time, TimeUnit unit)
{
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
public long getKeepAliveTime(TimeUnit unit)
{
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
public BlockingQueue<Runnable> getQueue()
{
return workQueue;
}
public boolean remove(Runnable task)
{
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
public void purge()
{
final BlockingQueue<Runnable> q = workQueue;
try
{
Iterator<Runnable> it = q.iterator();
while (it.hasNext())
{
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough)
{
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
public int getPoolSize()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally
{
mainLock.unlock();
}
}
public int getActiveCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally
{
mainLock.unlock();
}
}
public int getLargestPoolSize()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
return largestPoolSize;
} finally
{
mainLock.unlock();
}
}
public long getTaskCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
long n = completedTaskCount;
for (Worker w : workers)
{
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally
{
mainLock.unlock();
}
}
public long getCompletedTaskCount()
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally
{
mainLock.unlock();
}
}
public String toString()
{
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers)
{
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally
{
mainLock.unlock();
}
int c = ctl.get();
String runState =
isRunning(c) ? "Running" :
runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down";
return super.toString() +
"[" + runState +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
protected void beforeExecute(Thread t, Runnable r)
{
}
protected void afterExecute(Runnable r, Throwable t)
{
}
protected void terminated()
{
}
public static class CallerRunsPolicy implements RejectedExecutionHandler
{
public CallerRunsPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler
{
public AbortPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler
{
public DiscardPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler
{
public DiscardOldestPolicy()
{
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{
if (!e.isShutdown())
{
e.getQueue().poll();
e.execute(r);
}
}
}
}