跳到主要内容 深入解析 Java 线程池的开源扩展方案 | 极客日志
Java java
深入解析 Java 线程池的开源扩展方案 对比了 JDK 原生 ThreadPoolExecutor 与 Tomcat、Dubbo、Motan 三大开源框架中线程池的实现差异。重点分析了队列实现(TaskQueue、ExecutorQueue)及任务提交逻辑(offer/force/retryOffer)的定制优化。Tomcat 优先创建线程避免排队,Dubbo 支持 Eager 模式,Motan 借鉴 Tomcat 并提升队列性能。核心在于适配 Web 场景短连接高并发特性,减少请求等待时间。
道系青年 发布于 2026/3/27 更新于 2026/4/16 2 浏览开篇
Tomcat、Dubbo、Motan 的线程池并非是对 JDK 原生的简单封装,而是基于各自核心场景做了定制化源码扩展。三者均基于 JDK java.util.concurrent.ThreadPoolExecutor 扩展,核心差异在队列实现和线程调度逻辑。默认你懂 JDK 线程池的核心流程(核心线程→队列→临时线程→拒绝策略)。
一、Tomcat(9.0.41)
Tomcat 线程池 vs JDK 原生线程池
Tomcat 的核心线程池实现是 org.apache.tomcat.util.threads.ThreadPoolExecutor(注意:不是 JDK 的 java.util.concurrent.ThreadPoolExecutor),它继承自 JDK 原生类,但做了 3 个关键扩展,适配 Web 场景:
特性 JDK 原生 ThreadPoolExecutor Tomcat ThreadPoolExecutor 核心线程预启动 默认不预启动,需手动调用 prestartCoreThread() 可配置 prestartminSpareThreads,自动预启动核心线程 队列满后的处理逻辑 队列满→创建临时线程→触发拒绝策略 队列满→直接触发拒绝策略(Web 场景优先拒绝,避免请求堆积) 线程空闲销毁逻辑 核心线程默认永不销毁 可配置 minSpareThreads,核心线程低于该值时不销毁 适用场景 通用业务场景(长任务/短任务均可) Web 场景(短连接、高并发、快速响应)
技术要点
Tomcat 线程池的设计核心是'快进快出'——Web 请求都是短任务,优先保证'连接能快速被处理',而非像 JDK 线程池那样'尽量容纳任务',这是调优的核心原则。
Tomcat 线程池的核心实现类是 ThreadPoolExecutor(Tomcat 自定义),核心依赖 TaskQueue(自定义队列),以下只拆 Web 场景最关键的逻辑:
线程池的创建入口
public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor , ResizableExecutor {
protected String namePrefix = "tomcat-exec-" ;
protected int maxThreads = 200 ;
;
;
;
;
Integer.MAX_VALUE;
;
LifecycleException {
.taskqueue = ( .maxQueueSize);
( .namePrefix, .daemon, .getThreadPriority());
.executor = ( .getMinSpareThreads(), .getMaxThreads(), ( ) .maxIdleTime, TimeUnit.MILLISECONDS, .taskqueue, tf);
.executor.setThreadRenewalDelay( .threadRenewalDelay);
( .prestartminSpareThreads) {
.executor.prestartAllCoreThreads();
}
.taskqueue.setParent( .executor);
.setState(LifecycleState.STARTING);
}
}
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
protected
int
minSpareThreads
=
25
protected
int
maxIdleTime
=
60000
protected
ThreadPoolExecutor
executor
=
null
protected
boolean
prestartminSpareThreads
=
false
protected
int
maxQueueSize
=
private
TaskQueue
taskqueue
=
null
protected
void
startInternal
()
throws
this
new
TaskQueue
this
TaskThreadFactory
tf
=
new
TaskThreadFactory
this
this
this
this
new
ThreadPoolExecutor
this
this
long
this
this
this
this
if
this
this
this
this
this
public void execute (Runnable command, long timeout, TimeUnit unit) {
this .submittedCount.incrementAndGet();
try {
super .execute(command);
} catch (RejectedExecutionException rx) {
if (!(super .getQueue() instanceof TaskQueue)) {
this .submittedCount.decrementAndGet();
throw rx;
}
TaskQueue queue = (TaskQueue) super .getQueue();
try {
if (!queue.force(command, timeout, unit)) {
this .submittedCount.decrementAndGet();
throw new RejectedExecutionException (sm.getString("threadPoolExecutor.queueFull" ));
}
} catch (InterruptedException x) {
this .submittedCount.decrementAndGet();
throw new RejectedExecutionException (x);
}
}
}
核心队列:TaskQueue(Tomcat 自定义的阻塞队列)
这是 Tomcat 线程池和 JDK 原生最核心的差异点,源码核心逻辑:
public class TaskQueue extends LinkedBlockingQueue <Runnable> {
private transient ThreadPoolExecutor executor;
public boolean force (Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (this .parent != null && !this .parent.isShutdown()) {
return super .offer(o, timeout, unit);
} else {
throw new RejectedExecutionException (sm.getString("taskQueue.notRunning" ));
}
}
public boolean offer (Runnable o) {
if (this .parent == null ) {
return super .offer(o);
} else if (this .parent.getPoolSize() == this .parent.getMaximumPoolSize()) {
return super .offer(o);
} else if (this .parent.getSubmittedCount() <= this .parent.getPoolSize()) {
return super .offer(o);
} else {
return this .parent.getPoolSize() < this .parent.getMaximumPoolSize() ? false : super .offer(o);
}
}
}
源码解读 JDK 原生线程池的逻辑是'核心线程→队列→临时线程→拒绝',而 Tomcat 通过重写 offer 方法,改成了'核心线程→临时线程(直到 maxThreads)→队列→拒绝'——这意味着 Tomcat 会优先创建线程处理请求,而非让请求排队,完美适配 Web 请求'短、快'的特性,减少请求排队等待时间。
Tomcat 线程池支持配置 prestartminSpareThreads="true",源码层面会在初始化时调用。预启动核心线程能避免'首次请求创建线程的开销'——生产中建议开启,尤其是秒杀、活动等突发流量场景。
public void prestartAllCoreThreads () {
int n = prestartCoreThread();
while (n < getCorePoolSize()) {
n += prestartCoreThread();
}
}
二、Dubbo(3.3.0) org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool#getExecutor
public Executor getExecutor (URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
TaskQueue<Runnable> taskQueue = new TaskQueue <>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor (cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory (name, true ), new AbortPolicyWithReport (name, url));
taskQueue.setExecutor(executor);
return executor;
}
org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor#execute
public void execute (Runnable command) {
if (command == null ) {
throw new NullPointerException ();
}
submittedTaskCount.incrementAndGet();
try {
super .execute(command);
} catch (RejectedExecutionException rx) {
final TaskQueue queue = (TaskQueue) super .getQueue();
try {
if (!queue.retryOffer(command, 0 , TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException ("Queue capacity is full." , rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException (x);
}
} catch (Throwable t) {
submittedTaskCount.decrementAndGet();
throw t;
}
}
org.apache.dubbo.common.threadpool.support.eager.TaskQueue#offer
public boolean offer (Runnable runnable) {
if (executor == null ) {
throw new RejectedExecutionException ("The task queue does not have executor!" );
}
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super .offer(runnable);
}
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false ;
}
return super .offer(runnable);
}
org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport#rejectedExecution
public void rejectedExecution (Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
this .threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), this .url.getProtocol(), this .url.getIp(), this .url.getPort());
logger.warn("0-1" , "too much client requesting provider" , "" , msg);
if (Boolean.parseBoolean(this .url.getParameter("dump.enable" , Boolean.TRUE.toString()))) {
this .dumpJStack();
}
this .dispatchThreadPoolExhaustedEvent(msg);
throw new RejectedExecutionException (msg);
}
三、Motan(1.2.0) com.weibo.api.motan.core.StandardThreadExecutor
继承 JDK 的线程池,设计思路借鉴于 Tomcat。
队列 ExecutorQueue,继承 LinkedTransferQueue,能保证更高性能,相比 LinkedBlockingQueue 有明显提升。
public StandardThreadExecutor (int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit, int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super (coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue (),
threadFactory, handler);
((ExecutorQueue) getQueue()).setStandardThreadExecutor(this );
submittedTasksCount = new AtomicInteger (0 );
maxSubmittedTaskCount = queueCapacity + maxThreads;
}
public void execute (Runnable command) {
int count = submittedTasksCount.incrementAndGet();
if (count > maxSubmittedTaskCount) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this );
}
try {
super .execute(command);
} catch (RejectedExecutionException rx) {
if (!((ExecutorQueue) getQueue()).force(command)) {
submittedTasksCount.decrementAndGet();
getRejectedExecutionHandler().rejectedExecution(command, this );
}
}
}
com.weibo.api.motan.core.ExecutorQueue#force
public boolean force (Runnable o) {
if (threadPoolExecutor.isShutdown()) {
throw new RejectedExecutionException ("Executor not running, can't force a command into the queue" );
}
return super .offer(o);
}
public boolean offer (Runnable o) {
int poolSize = threadPoolExecutor.getPoolSize();
if (poolSize == threadPoolExecutor.getMaximumPoolSize()) {
return super .offer(o);
}
if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {
return super .offer(o);
}
if (poolSize < threadPoolExecutor.getMaximumPoolSize()) {
return false ;
}
return super .offer(o);
}