跳到主要内容Javajava
Spring Boot 3.x 虚拟线程集成配置问题与解决方案
本文详解 Spring Boot 3.x 中虚拟线程的集成配置问题。涵盖 Web 容器(Tomcat/Jetty)配置、数据库连接池(HikariCP)兼容性、ThreadLocal 及 MDC 上下文丢失处理、异步任务 (@Async/CompletableFuture) 支持以及监控诊断方案。提供了具体的代码示例和 YAML 配置,帮助开发者在保持高并发优势的同时避免常见陷阱,实现虚拟线程在生产环境中的稳定运行。
莫名其妙2 浏览 Spring Boot 3.x 虚拟线程集成配置问题详解与解决方案
一、问题背景与概述
1.1 虚拟线程简介
Java 19 引入的虚拟线程(Virtual Threads)是轻量级线程,由 JVM 管理而非操作系统:
Thread (() -> {
System.out.println( + Thread.currentThread());
});
Thread.startVirtualThread(() -> {
System.out.println( + Thread.currentThread());
});
platformThread
=
new
Thread
"Platform thread: "
Thread
virtualThread
=
"Virtual thread: "
1.2 Spring Boot 3.x 集成问题
- Tomcat/Jetty 容器配置问题
- 数据库连接池不兼容
- ThreadLocal/InheritableThreadLocal 失效
- MDC 日志上下文丢失
- 事务管理异常
- Reactive 与虚拟线程冲突
- 监控和调试困难
二、Web 容器虚拟线程配置问题
2.1 Tomcat 虚拟线程配置
server:
tomcat:
threads:
max: 200
min-spare: 10
WARNING: Virtual threads not enabled for Tomcat
解决方案:
@Configuration
public class TomcatVirtualThreadConfig {
@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadCustomizer() {
return protocolHandler -> {
protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
if (protocolHandler instanceof AbstractHttp11Protocol<?> http11) {
http11.setMaxConnections(10000);
http11.setMaxThreads(1000);
http11.setConnectionTimeout(30000);
http11.setKeepAliveTimeout(60000);
http11.setMaxKeepAliveRequests(100);
}
};
}
@Bean
public WebServerFactoryCustomizer<TomcatServletWebServerFactory> tomcatCustomizer() {
return factory -> {
factory.addConnectorCustomizers(connector -> {
connector.setProperty("maxThreads", "1000");
connector.setProperty("acceptorThreadCount", "2");
connector.setProperty("minSpareThreads", "5");
connector.setProperty("useVirtualThreads", "true");
connector.setProperty("virtualThreadKeepAlive", "60");
});
factory.addContextCustomizers(context -> {
context.setSessionTimeout(30);
context.setBackgroundProcessorDelay(10);
});
};
}
}
@Configuration
@ConditionalOnWebApplication(type = Type.SERVLET)
public class FullTomcatVirtualThreadConfig {
@Value("${server.tomcat.virtual-threads.max-pool-size:1000}")
private int maxPoolSize;
@Value("${server.tomcat.virtual-threads.keep-alive-seconds:60}")
private int keepAliveSeconds;
@Bean
public ServletWebServerFactory servletContainer() {
TomcatServletWebServerFactory factory = new TomcatServletWebServerFactory();
factory.addConnectorCustomizers(connector -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof AbstractHttp11Protocol<?> http11) {
configureForVirtualThreads(http11);
}
});
return factory;
}
private void configureForVirtualThreads(AbstractHttp11Protocol<?> protocol) {
ExecutorService virtualThreadExecutor = createVirtualThreadExecutor();
protocol.setExecutor(virtualThreadExecutor);
protocol.setMaxConnections(10000);
protocol.setConnectionTimeout(30000);
protocol.setConnectionUploadTimeout(60000);
protocol.setDisableUploadTimeout(false);
protocol.setKeepAliveTimeout(60000);
protocol.setMaxKeepAliveRequests(100);
protocol.setMaxSwallowSize(2097152);
protocol.setUseSendfile(true);
protocol.setSSLEnabled(true);
protocol.setSSLProtocol("TLSv1.2+TLSv1.3");
protocol.setUseServerCipherSuitesOrder(true);
protocol.setCompression("on");
protocol.setCompressionMinSize(2048);
protocol.setCompressableMimeType("text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml");
}
private ExecutorService createVirtualThreadExecutor() {
ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("tomcat-virtual-", 0).factory();
return Executors.newThreadPerTaskExecutor(virtualThreadFactory);
}
}
server:
port: 8080
tomcat:
virtual-threads:
enabled: true
max-pool-size: 1000
keep-alive-seconds: 60
thread-name-prefix: "tomcat-vt-"
connection-timeout: 30s
max-connections: 10000
keep-alive:
timeout: 60s
max-requests: 100
compression:
enabled: true
min-response-size: 2KB
mime-types:
- text/html
- text/xml
- text/plain
- text/css
- application/json
accesslog:
enabled: true
pattern: "%t %a %r %s %D %{User-Agent}i"
2.2 Jetty 虚拟线程配置
@Configuration
@ConditionalOnClass(org.eclipse.jetty.server.Server.class)
public class JettyVirtualThreadConfig {
@Bean
public ConfigurableServletWebServerFactory webServerFactory() {
JettyServletWebServerFactory factory = new JettyServletWebServerFactory();
factory.addServerCustomizers(server -> {
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setVirtualThreadsExecutor(Executors.newVirtualThreadPerTaskExecutor());
threadPool.setMaxThreads(1000);
threadPool.setMinThreads(10);
threadPool.setIdleTimeout(60000);
server.setThreadPool(threadPool);
for (Connector connector : server.getConnectors()) {
if (connector instanceof ServerConnector serverConnector) {
configureJettyConnector(serverConnector);
}
}
});
return factory;
}
private void configureJettyConnector(ServerConnector connector) {
HttpConfiguration httpConfig = connector.getConnectionFactory(HttpConnectionFactory.class).getHttpConfiguration();
httpConfig.setRequestHeaderSize(8192);
httpConfig.setResponseHeaderSize(8192);
httpConfig.setSendServerVersion(false);
httpConfig.setSendDateHeader(true);
httpConfig.setHeaderCacheSize(512);
connector.setAcceptQueueSize(1024);
connector.setIdleTimeout(30000);
SslConnectionFactory sslConnFactory = connector.getConnectionFactory(SslConnectionFactory.class);
if (sslConnFactory != null) {
SslContextFactory.Server sslContextFactory = (SslContextFactory.Server) sslConnFactory.getSslContextFactory();
sslContextFactory.setExcludeCipherSuites("SSL_RSA_WITH_DES_CBC_SHA","SSL_DHE_RSA_WITH_DES_CBC_SHA","SSL_DHE_DSS_WITH_DES_CBC_SHA");
}
}
}
三、数据库连接池问题
3.1 HikariCP 虚拟线程兼容性问题
WARNING: HikariPool-1 - Connection is not available, request timed out after 30000ms.
解决方案:
@Configuration
public class HikariVirtualThreadConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariDataSource dataSource(DataSourceProperties properties) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(properties.getUrl());
config.setUsername(properties.getUsername());
config.setPassword(properties.getPassword());
config.setDriverClassName(properties.getDriverClassName());
config.setMaximumPoolSize(100);
config.setMinimumIdle(10);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
config.setKeepaliveTime(30000);
config.addDataSourceProperty("useVirtualThreads", "true");
config.addDataSourceProperty("prepStmtCacheSize", 250);
config.addDataSourceProperty("prepStmtCacheSqlLimit", 2048);
config.addDataSourceProperty("cachePrepStmts", true);
config.addDataSourceProperty("useServerPrepStmts", true);
config.setMetricRegistry(null);
config.setHealthCheckRegistry(null);
config.setLeakDetectionThreshold(60000);
return new HikariDataSource(config);
}
}
@Component
public class VirtualThreadAwareDataSource implements DataSource {
private final DataSource delegate;
private final ExecutorService virtualThreadExecutor;
public VirtualThreadAwareDataSource(DataSource delegate) {
this.delegate = delegate;
this.virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
@Override
public Connection getConnection() throws SQLException {
try {
return virtualThreadExecutor.submit(delegate::getConnection).get();
} catch (InterruptedException | ExecutionException e) {
throw new SQLException("Failed to get connection in virtual thread", e);
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return delegate.getConnection(username, password);
}
@PreDestroy
public void shutdown() {
virtualThreadExecutor.shutdown();
}
}
spring:
datasource:
url: jdbc:mysql://localhost:3306/db?useSSL=false&serverTimezone=UTC
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 50
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
pool-name: VirtualThreadPool
leak-detection-threshold: 60000
connection-test-query: SELECT 1
data-source-properties:
useVirtualThreads: true
cachePrepStmts: true
prepStmtCacheSize: 250
prepStmtCacheSqlLimit: 2048
useServerPrepStmts: true
useLocalSessionState: true
rewriteBatchedStatements: true
maintainTimeStats: false
cacheResultSetMetadata: true
cacheServerConfiguration: true
elideSetAutoCommits: true
3.2 事务管理问题
@Configuration
@EnableTransactionManagement
public class VirtualThreadTransactionConfig {
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory);
transactionManager.setNestedTransactionAllowed(true);
transactionManager.setValidateExistingTransaction(true);
transactionManager.setGlobalRollbackOnParticipationFailure(false);
return transactionManager;
}
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
TransactionTemplate template = new TransactionTemplate(transactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
template.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
template.setTimeout(30);
template.setReadOnly(false);
return template;
}
@Bean
public TransactionInterceptor transactionInterceptor(PlatformTransactionManager transactionManager) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionManager(transactionManager);
NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
Properties props = new Properties();
props.setProperty("get*", "PROPAGATION_SUPPORTS,readOnly");
props.setProperty("find*", "PROPAGATION_SUPPORTS,readOnly");
props.setProperty("*", "PROPAGATION_REQUIRED,-Exception");
source.setProperties(props);
interceptor.setTransactionAttributeSource(source);
return interceptor;
}
}
四、ThreadLocal 和上下文传递问题
4.1 MDC 日志上下文丢失问题
@Slf4j
@Component
public class LoggingService {
public void process() {
MDC.put("requestId", UUID.randomUUID().toString());
Thread.startVirtualThread(() -> {
log.info("Processing in virtual thread");
});
}
}
解决方案:
@Component
public class VirtualThreadMDC {
private static final ThreadLocal<Map<String, String>> VIRTUAL_THREAD_CONTEXT = new ThreadLocal<>();
public static void wrap(Runnable task) {
Map<String, String> context = MDC.getCopyOfContextMap();
Thread virtualThread = Thread.ofVirtual().start(() -> {
if (context != null) {
MDC.setContextMap(context);
}
try {
task.run();
} finally {
MDC.clear();
}
});
}
public static <T> CompletableFuture<T> wrapAsync(Supplier<T> supplier) {
Map<String, String> context = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
if (context != null) {
MDC.setContextMap(context);
}
try {
return supplier.get();
} finally {
MDC.clear();
}
}, Executors.newVirtualThreadPerTaskExecutor());
}
public static StructuredTaskWrapper structured() {
return new StructuredTaskWrapper();
}
public static class StructuredTaskWrapper {
private final Map<String, Object> attributes = new HashMap<>();
public StructuredTaskWrapper attribute(String key, Object value) {
attributes.put(key, value);
return this;
}
public void run(Runnable task) {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
Thread.startVirtualThread(() -> {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try (var scope = new StructuredTaskScope()) {
task.run();
} catch (Exception e) {
log.error("Virtual thread task failed", e);
} finally {
MDC.clear();
}
});
}
}
}
@Service
public class LoggingService {
@Autowired
private VirtualThreadMDC mdcWrapper;
public void processWithContext() {
MDC.put("requestId", "req-123");
MDC.put("userId", "user-456");
mdcWrapper.wrap(() -> {
log.info("Processing with preserved MDC context");
mdcWrapper.wrap(() -> {
log.info("Nested virtual thread with context");
});
});
}
}
@Aspect
@Component
public class VirtualThreadMDCAspect {
@Around("@annotation(WithVirtualThread)")
public Object manageMDC(ProceedingJoinPoint joinPoint) throws Throwable {
Map<String, String> context = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
try {
if (context != null) {
MDC.setContextMap(context);
}
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
MDC.put("method", signature.getMethod().getName());
MDC.put("class", joinPoint.getTarget().getClass().getSimpleName());
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
MDC.clear();
}
}, Executors.newVirtualThreadPerTaskExecutor()).join();
}
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WithVirtualThread {
String context() default "";
}
@Service
public class AsyncService {
@WithVirtualThread
public CompletableFuture<String> processAsync(String input) {
log.info("Processing in virtual thread with MDC");
return CompletableFuture.completedFuture("Processed: " + input);
}
}
4.2 请求上下文传递问题
@Component
public class VirtualThreadRequestContext {
private static final ThreadLocal<RequestAttributes> VIRTUAL_THREAD_REQUEST = new ThreadLocal<>();
public static Runnable wrapWithContext(Runnable task) {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
if (attributes != null) {
RequestContextHolder.setRequestAttributes(attributes, true);
}
task.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
}
public static <T> CompletableFuture<T> supplyAsyncWithContext(Supplier<T> supplier) {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
return CompletableFuture.supplyAsync(() -> {
try {
if (attributes != null) {
RequestContextHolder.setRequestAttributes(attributes, true);
}
return supplier.get();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}, Executors.newVirtualThreadPerTaskExecutor());
}
public static VirtualThreadScope createScope() {
return new VirtualThreadScope();
}
public static class VirtualThreadScope implements AutoCloseable {
private final Map<Class<?>, Object> beans = new HashMap<>();
private final RequestAttributes originalAttributes;
public VirtualThreadScope() {
this.originalAttributes = RequestContextHolder.getRequestAttributes();
}
public <T> T getOrCreate(Class<T> type, Supplier<T> creator) {
return type.cast(beans.computeIfAbsent(type, k -> creator.get()));
}
public void runInScope(Runnable task) {
Thread.startVirtualThread(() -> {
try {
if (originalAttributes != null) {
RequestContextHolder.setRequestAttributes(originalAttributes, true);
}
task.run();
} finally {
RequestContextHolder.resetRequestAttributes();
beans.clear();
}
});
}
@Override
public void close() {
beans.clear();
}
}
}
@Component
public class VirtualThreadContextInterceptor implements AsyncHandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
request.setAttribute("virtualThreadContext", RequestContextHolder.getRequestAttributes());
return true;
}
@Override
public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler) {
}
}
@Configuration
public class WebMvcVirtualThreadConfig implements WebMvcConfigurer {
@Autowired
private VirtualThreadContextInterceptor contextInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(contextInterceptor).addPathPatterns("/api/**");
}
}
五、异步任务与虚拟线程集成
5.1 @Async 虚拟线程配置
@Configuration
@EnableAsync
public class VirtualThreadAsyncConfig implements AsyncConfigurer {
@Value("${async.virtual-threads.core-pool-size:100}")
private int corePoolSize;
@Value("${async.virtual-threads.max-pool-size:1000}")
private int maxPoolSize;
@Value("${async.virtual-threads.keep-alive-seconds:60}")
private int keepAliveSeconds;
@Override
public Executor getAsyncExecutor() {
ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("async-vt-", 0).factory();
ExecutorService executor = Executors.newThreadPerTaskExecutor(virtualThreadFactory);
return new DelegatingExecutor(executor) {
@Override
public void execute(Runnable task) {
Runnable wrappedTask = VirtualThreadRequestContext.wrapWithContext(task);
super.execute(wrappedTask);
}
};
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.error("Async method {} failed", method.getName(), ex);
if (ex instanceof VirtualThreadException) {
handleVirtualThreadException((VirtualThreadException) ex);
}
};
}
@Bean(name = "virtualThreadTaskExecutor")
public TaskExecutor virtualThreadTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadFactory(Thread.ofVirtual().name("task-vt-", 0).factory());
executor.setCorePoolSize(1);
executor.setMaxPoolSize(Integer.MAX_VALUE);
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.setThreadNamePrefix("task-vt-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
@Bean
public TaskScheduler virtualThreadTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadFactory(Thread.ofVirtual().name("scheduler-vt-", 0).factory());
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("scheduler-vt-");
scheduler.setAwaitTerminationSeconds(30);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.initialize();
return scheduler;
}
}
@Service
public class AsyncVirtualThreadService {
@Async("virtualThreadTaskExecutor")
@VirtualThreadContext
public CompletableFuture<String> processAsync(String input) {
log.info("Processing in virtual thread: {}", input);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return CompletableFuture.completedFuture("Processed: " + input);
}
public CompletableFuture<List<String>> processBatch(List<String> inputs) {
List<CompletableFuture<String>> futures = inputs.stream()
.map(this::processAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
}
@Aspect
@Component
public class VirtualThreadContextAspect {
@Around("@annotation(virtualThreadContext)")
public Object handleVirtualThreadContext(ProceedingJoinPoint joinPoint, VirtualThreadContext virtualThreadContext) throws Throwable {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return CompletableFuture.supplyAsync(() -> {
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
if (requestAttributes != null) {
RequestContextHolder.setRequestAttributes(requestAttributes, true);
}
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
} finally {
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
}, Executors.newVirtualThreadPerTaskExecutor()).join();
}
}
5.2 CompletableFuture 虚拟线程支持
@Component
public class VirtualThreadCompletableFuture {
private final ExecutorService virtualThreadExecutor;
public VirtualThreadCompletableFuture() {
this.virtualThreadExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("cf-vt-", 0).factory());
}
public <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(() -> {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
return supplier.get();
} finally {
MDC.clear();
}
}, virtualThreadExecutor);
}
public CompletableFuture<Void> runAsync(Runnable runnable) {
return CompletableFuture.runAsync(() -> {
Map<String, String> mdcContext = MDC.getCopyOfContextMap();
try {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
runnable.run();
} finally {
MDC.clear();
}
}, virtualThreadExecutor);
}
public <T> CompletableFuture<List<T>> allOf(List<Supplier<T>> suppliers) {
List<CompletableFuture<T>> futures = suppliers.stream()
.map(this::supplyAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
public <T> CompletableFuture<T> supplyAsyncWithTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
return supplyAsync(supplier)
.orTimeout(timeout, unit)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
log.warn("Virtual thread task timeout after {} {}", timeout, unit);
return null;
}
throw new CompletionException(ex);
});
}
@PreDestroy
public void shutdown() {
virtualThreadExecutor.shutdown();
try {
if (!virtualThreadExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
virtualThreadExecutor.shutdownNow();
}
} catch (InterruptedException e) {
virtualThreadExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
六、监控和诊断配置
6.1 虚拟线程监控
@Component
@Slf4j
public class VirtualThreadMonitor {
private final ScheduledExecutorService monitorScheduler;
private final Map<String, VirtualThreadStats> threadStats = new ConcurrentHashMap<>();
public VirtualThreadMonitor() {
this.monitorScheduler = Executors.newScheduledThreadPool(1);
}
@PostConstruct
public void startMonitoring() {
monitorScheduler.scheduleAtFixedRate(() -> {
try {
collectVirtualThreadStats();
logVirtualThreadMetrics();
checkForProblems();
} catch (Exception e) {
log.error("Failed to collect virtual thread stats", e);
}
}, 5, 5, TimeUnit.SECONDS);
registerJmxBean();
}
private void collectVirtualThreadStats() {
Thread.getAllStackTraces().forEach((thread, stackTrace) -> {
if (thread.isVirtual()) {
String threadName = thread.getName();
VirtualThreadStats stats = threadStats.computeIfAbsent(
threadName, k -> new VirtualThreadStats());
stats.update(thread, stackTrace);
}
});
cleanupOldStats();
}
private void logVirtualThreadMetrics() {
long virtualThreadCount = threadStats.size();
long pinnedThreads = threadStats.values().stream()
.filter(VirtualThreadStats::isPinned)
.count();
double avgCpuTime = threadStats.values().stream()
.mapToLong(VirtualThreadStats::getCpuTime)
.average()
.orElse(0);
log.info("Virtual Thread Metrics - Count: {}, Pinned: {}, Avg CPU: {}ms", virtualThreadCount, pinnedThreads, avgCpuTime);
if (log.isDebugEnabled()) {
threadStats.forEach((name, stats) -> {
log.debug("Thread {}: state={}, cpu={}ms, user={}ms", name, stats.getState(), stats.getCpuTime(), stats.getUserTime());
});
}
}
private void checkForProblems() {
threadStats.entrySet().stream()
.filter(entry -> entry.getValue().isPinned())
.forEach(entry -> {
log.warn("Virtual thread {} is pinned! Stack trace:\n{}", entry.getKey(), entry.getValue().getStackTrace());
});
threadStats.entrySet().stream()
.filter(entry -> entry.getValue().getCpuTime() > 1000)
.forEach(entry -> {
log.warn("Virtual thread {} using high CPU: {}ms", entry.getKey(), entry.getValue().getCpuTime());
});
}
private void registerJmxBean() {
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("com.example:type=VirtualThreadMonitor");
mbs.registerMBean(new VirtualThreadMonitorMXBean() {
@Override
public long getVirtualThreadCount() {
return threadStats.size();
}
@Override
public long getPinnedThreadCount() {
return threadStats.values().stream()
.filter(VirtualThreadStats::isPinned)
.count();
}
@Override
public Map<String, String> getThreadStates() {
return threadStats.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getState().toString()));
}
}, name);
} catch (Exception e) {
log.error("Failed to register JMX bean", e);
}
}
@PreDestroy
public void stopMonitoring() {
monitorScheduler.shutdown();
}
private static class VirtualThreadStats {
private Thread.State state;
private long cpuTime;
private long userTime;
private boolean pinned;
private StackTraceElement[] stackTrace;
private long lastUpdateTime;
void update(Thread thread, StackTraceElement[] stackTrace) {
this.state = thread.getState();
this.cpuTime = thread.isAlive() ? ManagementFactory.getThreadMXBean().getThreadCpuTime(thread.getId()) / 1_000_000 : 0;
this.userTime = thread.isAlive() ? ManagementFactory.getThreadMXBean().getThreadUserTime(thread.getId()) / 1_000_000 : 0;
this.pinned = isThreadPinned(thread);
this.stackTrace = stackTrace;
this.lastUpdateTime = System.currentTimeMillis();
}
private boolean isThreadPinned(Thread thread) {
return thread.getState() == Thread.State.RUNNABLE && cpuTime > 100;
}
}
public interface VirtualThreadMonitorMXBean {
long getVirtualThreadCount();
long getPinnedThreadCount();
Map<String, String> getThreadStates();
}
}
6.2 Micrometer 监控集成
@Configuration
public class VirtualThreadMetricsConfig {
@Bean
public MeterBinder virtualThreadMeterBinder() {
return registry -> {
Gauge.builder("jvm.virtual.threads.count",
() -> Thread.getAllStackTraces().keySet().stream()
.filter(Thread::isVirtual)
.count())
.description("Number of virtual threads")
.tag("type", "virtual")
.register(registry);
Gauge.builder("jvm.platform.threads.count",
() -> Thread.getAllStackTraces().keySet().stream()
.filter(t -> !t.isVirtual())
.count())
.description("Number of platform threads")
.tag("type", "platform")
.register(registry);
for (Thread.State state : Thread.State.values()) {
Gauge.builder("jvm.virtual.threads.state",
() -> Thread.getAllStackTraces().keySet().stream()
.filter(Thread::isVirtual)
.filter(t -> t.getState() == state)
.count())
.description("Virtual threads in state " + state)
.tag("state", state.name().toLowerCase())
.register(registry);
}
Counter.builder("jvm.virtual.threads.created")
.description("Total virtual threads created")
.register(registry);
Timer.builder("jvm.virtual.threads.cpu.time")
.description("CPU time used by virtual threads")
.publishPercentiles(0.5, 0.95, 0.99)
.register(registry);
};
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {
return new TimedAspect(registry);
}
@Bean
public CountedAspect countedAspect(MeterRegistry registry) {
return new CountedAspect(registry);
}
}
@Component
public class VirtualThreadMetricsCollector {
private final MeterRegistry meterRegistry;
private final Map<Long, Long> threadStartTimes = new ConcurrentHashMap<>();
public VirtualThreadMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
Thread.Builder virtualThreadBuilder = Thread.ofVirtual().name("monitored-vt-", 0).allowSetThreadLocals(true).inheritInheritableThreadLocals(true);
ThreadFactory monitoredFactory = runnable -> {
Thread thread = virtualThreadBuilder.unstarted(runnable);
Counter createdCounter = Counter.builder("jvm.virtual.threads.created")
.tag("name", thread.getName())
.register(meterRegistry);
createdCounter.increment();
threadStartTimes.put(thread.getId(), System.nanoTime());
return thread;
};
}
public void recordThreadCompletion(long threadId) {
Long startTime = threadStartTimes.remove(threadId);
if (startTime != null) {
long duration = System.nanoTime() - startTime;
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("jvm.virtual.threads.lifetime")
.description("Virtual thread lifetime")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry));
}
}
}
七、最佳实践配置总结
7.1 完整配置示例
spring:
application:
name: virtual-thread-demo
virtual-threads:
enabled: true
mode: hybrid
executor:
core-pool-size: 1
max-pool-size: 10000
keep-alive-seconds: 60
thread-name-prefix: "app-vt-"
tomcat:
enabled: true
max-connections: 10000
connection-timeout: 30s
database:
connection-pool:
max-size: 50
validation-query: "SELECT 1"
leak-detection-threshold: 60s
datasource:
hikari:
maximum-pool-size: 50
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
data-source-properties:
useVirtualThreads: true
cachePrepStmts: true
prepStmtCacheSize: 250
task:
execution:
pool:
core-size: 1
max-size: 10000
queue-capacity: 0
keep-alive: 60s
thread-name-prefix: "async-vt-"
scheduling:
pool:
size: 10
thread-name-prefix: "sched-vt-"
jpa:
properties:
hibernate:
connection.handling_mode: DELAYED_ACQUISITION_AND_HOLD
jdbc.batch_size: 20
order_inserts: true
order_updates: true
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,threaddump
metrics:
export:
prometheus:
enabled: true
tags:
application: ${spring.application.name}
environment: ${ENV:local}
endpoint:
health:
show-details: always
logging:
pattern:
level: "%5p [${spring.application.name},%X{traceId:-},%X{spanId:-},%thread]"
level:
org.springframework: INFO
com.example: DEBUG
virtual-thread-tuning:
pinning-threshold-ms: 100
stack-size-kb: 1024
carrier-thread-count: ${CPU_COUNT:8}
monitor-enabled: true
monitor-interval-ms: 5000
7.2 主配置类
@SpringBootApplication
@EnableAsync
@EnableScheduling
@EnableTransactionManagement
@EnableConfigurationProperties({VirtualThreadProperties.class, VirtualThreadTuningProperties.class})
public class VirtualThreadApplication {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(VirtualThreadApplication.class);
System.setProperty("spring.threads.virtual.enabled", "true");
System.setProperty("jdk.virtualThreadScheduler.parallelism", String.valueOf(Runtime.getRuntime().availableProcessors()));
System.setProperty("jdk.virtualThreadScheduler.maxPoolSize", "256");
app.run(args);
}
@Bean
public CommandLineRunner virtualThreadInfo() {
return args -> {
log.info("Virtual threads enabled: {}", Thread.currentThread().isVirtual() || ManagementFactory.getThreadMXBean().isVirtualThreadsSupported());
log.info("Available processors: {}", Runtime.getRuntime().availableProcessors());
log.info("JVM version: {}", System.getProperty("java.version"));
};
}
}
@ConfigurationProperties(prefix = "spring.virtual-threads")
@Data
public class VirtualThreadProperties {
private boolean enabled = true;
private Mode mode = Mode.HYBRID;
private ExecutorConfig executor = new ExecutorConfig();
private TomcatConfig tomcat = new TomcatConfig();
private DatabaseConfig database = new DatabaseConfig();
public enum Mode {
HYBRID,
VIRTUAL_ONLY,
PLATFORM_ONLY
}
@Data
public static class ExecutorConfig {
private int corePoolSize = 1;
private int maxPoolSize = 10000;
private int keepAliveSeconds = 60;
private String threadNamePrefix = "app-vt-";
private boolean allowCoreThreadTimeout = true;
private boolean prestartAllCoreThreads = false;
}
@Data
public static class TomcatConfig {
private boolean enabled = true;
private int maxConnections = 10000;
private Duration connectionTimeout = Duration.ofSeconds(30);
private int maxThreads = 1000;
private int minSpareThreads = 10;
private int acceptorThreadCount = 1;
}
@Data
public static class DatabaseConfig {
private ConnectionPool connectionPool = new ConnectionPool();
@Data
public static class ConnectionPool {
private int maxSize = 50;
private int minIdle = 5;
private String validationQuery = "SELECT 1";
private Duration leakDetectionThreshold = Duration.ofSeconds(60);
private boolean cachePrepStmts = true;
private int prepStmtCacheSize = 250;
}
}
}
7.3 健康检查
@Component
public class VirtualThreadHealthIndicator implements HealthIndicator {
private final VirtualThreadMonitor monitor;
private final VirtualThreadProperties properties;
public VirtualThreadHealthIndicator(VirtualThreadMonitor monitor, VirtualThreadProperties properties) {
this.monitor = monitor;
this.properties = properties;
}
@Override
public Health health() {
if (!properties.isEnabled()) {
return Health.up().withDetail("enabled", false).build();
}
try {
long virtualThreadCount = Thread.getAllStackTraces().keySet().stream()
.filter(Thread::isVirtual)
.count();
long pinnedThreads = Thread.getAllStackTraces().keySet().stream()
.filter(Thread::isVirtual)
.filter(this::isPinned)
.count();
Health.Builder builder = Health.up()
.withDetail("virtualThreads.count", virtualThreadCount)
.withDetail("virtualThreads.pinned", pinnedThreads)
.withDetail("mode", properties.getMode().toString());
if (pinnedThreads > virtualThreadCount * 0.1) {
builder.down()
.withDetail("reason", "Too many pinned virtual threads")
.withDetail("pinnedPercentage", String.format("%.1f%%", (pinnedThreads * 100.0 / virtualThreadCount)));
}
return builder.build();
} catch (Exception e) {
return Health.down(e).withDetail("error", e.getMessage()).build();
}
}
private boolean isPinned(Thread thread) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long cpuTime = threadBean.getThreadCpuTime(thread.getId());
return thread.getState() == Thread.State.RUNNABLE && cpuTime > 100_000_000;
}
}
7.4 调试和故障排除
@Component
@ConditionalOnProperty(name = "virtual-thread-tuning.debug", havingValue = "true")
public class VirtualThreadDebugger {
private final ThreadMXBean threadBean;
private final Map<Long, ThreadDebugInfo> debugInfos = new ConcurrentHashMap<>();
public VirtualThreadDebugger() {
this.threadBean = ManagementFactory.getThreadMXBean();
this.threadBean.setThreadContentionMonitoringEnabled(true);
this.threadBean.setThreadCpuTimeEnabled(true);
}
public void startMonitoring(Thread thread) {
if (thread.isVirtual()) {
ThreadDebugInfo info = new ThreadDebugInfo(thread);
debugInfos.put(thread.getId(), info);
thread.setUncaughtExceptionHandler((t, e) -> {
log.error("Virtual thread {} failed", t.getName(), e);
debugInfos.remove(t.getId());
});
}
}
public void recordEvent(long threadId, String event, Object data) {
ThreadDebugInfo info = debugInfos.get(threadId);
if (info != null) {
info.recordEvent(event, data);
}
}
public void dumpDebugInfo() {
debugInfos.forEach((id, info) -> {
log.info("Thread {} ({}):", info.getThreadName(), info.getThreadId());
log.info(" State: {}", info.getState());
log.info(" CPU Time: {}ms", info.getCpuTime() / 1_000_000);
log.info(" User Time: {}ms", info.getUserTime() / 1_000_000);
log.info(" Blocked Count: {}", info.getBlockedCount());
log.info(" Waited Count: {}", info.getWaitedCount());
if (!info.getEvents().isEmpty()) {
log.info(" Events:");
info.getEvents().forEach((time, event) -> {
log.info(" {}: {}", time, event);
});
}
if (log.isDebugEnabled()) {
log.debug(" Stack Trace:");
for (StackTraceElement element : info.getStackTrace()) {
log.debug(" {}", element);
}
}
});
}
private static class ThreadDebugInfo {
private final long threadId;
private final String threadName;
private final Thread thread;
private final Map<Long, String> events = new LinkedHashMap<>();
ThreadDebugInfo(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
this.thread = thread;
}
void recordEvent(String event, Object data) {
events.put(System.currentTimeMillis(), event + (data != null ? ": " + data : ""));
}
}
}
通过以上详细的配置和解决方案,可以有效地在 Spring Boot 3.x 中集成和使用虚拟线程,充分发挥其高并发优势,同时避免常见的集成问题。
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online