虚拟线程概述
什么是虚拟线程?
虚拟线程(Virtual Threads)是 JDK 实现的轻量级线程,由 JVM 而非操作系统管理。它们与传统的平台线程(Platform Threads,即操作系统线程)形成对比。
Java 21 虚拟线程是 Project Loom 的核心成果,由 JVM 管理而非操作系统。相比平台线程,虚拟线程轻量级,可创建数百万个,适合高并发 I/O 场景。内容包括虚拟线程的创建方法、核心应用场景、高级特性及内部机制。提供了性能调优建议、错误处理、监控调试方法及常见问题解决方案。掌握虚拟线程能显著提升 Java 应用的并发处理能力。

虚拟线程概述
虚拟线程(Virtual Threads)是 JDK 实现的轻量级线程,由 JVM 而非操作系统管理。它们与传统的平台线程(Platform Threads,即操作系统线程)形成对比。
java.lang.Thread API| 特性 | 平台线程 | 虚拟线程 |
|---|---|---|
| 创建成本 | 高(MB 级内存) | 极低(KB 级内存) |
| 最大数量 | 几千个 | 数百万个 |
| 调度者 | 操作系统 | JVM |
| 阻塞行为 | 阻塞整个 OS 线程 | 自动挂起,释放载体线程 |
| 适用场景 | CPU 密集型任务 | I/O 密集型任务 |
虚拟线程的创建方法
// 方法 1:直接创建并启动
Thread virtualThread1 = Thread.ofVirtual().start(() -> {
System.out.println("Hello from virtual thread!");
});
// 方法 2:构建 Thread 对象后启动
Thread virtualThread2 = Thread.ofVirtual().name("my-virtual-thread").unstarted(() -> {
System.out.println("Named virtual thread");
});
virtualThread2.start();
// 方法 3:设置守护线程
Thread virtualThread3 = Thread.ofVirtual().daemon(true).unstarted(() -> {
System.out.println("Daemon virtual thread");
});
// 获取虚拟线程构建器
Thread.Builder builder = Thread.ofVirtual();
// 链式调用设置属性
Thread virtualThread = builder
.name("custom-virtual-thread")
.daemon(false)
.unstarted(() -> {
System.out.println("Custom virtual thread running");
});
virtualThread.start();
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 创建虚拟线程执行器
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交任务,每个任务在一个新的虚拟线程中执行
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " running on " + Thread.currentThread());
return "Result " + taskId;
});
}
} // 自动关闭执行器
// 创建虚拟线程工厂
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
// 使用工厂创建线程
Thread thread = virtualThreadFactory.newThread(() -> {
System.out.println("Created via factory");
});
thread.start();
虚拟线程的核心使用场景
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HighConcurrencyIO {
private static final HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
public static void main(String[] args) throws InterruptedException {
// 模拟 10,000 个并发 HTTP 请求
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<?>[] futures = new CompletableFuture[10000];
for (int i = 0; i < 10000; i++) {
final int requestId = i;
futures[i] = CompletableFuture.runAsync(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/delay/1"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println("Request " + requestId + " completed with status: " + response.statusCode());
} catch (Exception e) {
System.err.println("Request " + requestId + " failed: " + e.getMessage());
}
}, executor);
}
// 等待所有请求完成
CompletableFuture.allOf(futures).join();
}
System.out.println("All requests completed!");
}
}
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DatabaseVirtualThreads {
private static final String DB_URL = "jdbc:postgresql://localhost:5432/test";
private static final String DB_USER = "user";
private static final String DB_PASSWORD = "password";
public static void main(String[] args) throws Exception {
// 传统方式需要大量数据库连接
// 虚拟线程方式:少量连接 + 大量虚拟线程
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 模拟 1000 个并发数据库查询
CompletableFuture<?>[] futures = new CompletableFuture[1000];
for (int i = 0; i < 1000; i++) {
final int queryId = i;
futures[i] = CompletableFuture.runAsync(() -> {
Connection conn = null;
try {
// 从连接池获取连接(这里简化为直接创建)
conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
PreparedStatement stmt = conn.prepareStatement(
"SELECT * FROM users WHERE id = ?");
stmt.setInt(1, queryId);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
System.out.println("Query " + queryId + ": " + rs.getString("name"));
}
} catch (Exception e) {
System.err.println("Query " + queryId + " failed: " + e.getMessage());
} finally {
if (conn != null) {
try {
conn.close(); // 返回到连接池
} catch (Exception e) {
// ignore
}
}
}
}, executor);
}
CompletableFuture.allOf(futures).join();
}
}
}
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
public class VirtualThreadWebServer {
public static void main(String[] args) throws IOException {
// 创建 HTTP 服务器
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
// 设置虚拟线程执行器处理每个请求
Executor virtualThreadExecutor = runnable -> {
Thread.ofVirtual().start(runnable);
};
server.setExecutor(virtualThreadExecutor);
// 添加处理器
server.createContext("/hello", new HttpHandler() {
@Override
public void handle(HttpExchange exchange) throws IOException {
// 模拟 I/O 操作(如数据库查询、外部 API 调用)
try {
Thread.sleep(100); // 模拟延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String response = "Hello from virtual thread! Thread: " + Thread.currentThread();
exchange.sendResponseHeaders(200, response.length());
OutputStream os = exchange.getResponseBody();
os.write(response.getBytes());
os.close();
}
});
server.start();
System.out.println("Server started on http://localhost:8080/hello");
}
}
虚拟线程的高级特性
虚拟线程完全支持 ThreadLocal:
public class VirtualThreadLocalExample {
private static final ThreadLocal<String> context = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
context.set("Context for " + threadName);
System.out.println("Thread: " + threadName + ", Context: " + context.get());
// 清理 ThreadLocal(最佳实践)
context.remove();
};
// 在虚拟线程中使用 ThreadLocal
Thread vt1 = Thread.ofVirtual().name("VT-1").unstarted(task);
Thread vt2 = Thread.ofVirtual().name("VT-2").unstarted(task);
vt1.start();
vt2.start();
vt1.join();
vt2.join();
}
}
虚拟线程支持标准的中断机制:
public class VirtualThreadInterruptExample {
public static void main(String[] args) throws InterruptedException {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
try {
System.out.println("Virtual thread started");
Thread.sleep(5000); // 可中断的阻塞操作
System.out.println("Virtual thread finished normally");
} catch (InterruptedException e) {
System.out.println("Virtual thread was interrupted");
Thread.currentThread().interrupt(); // 保持中断状态
}
});
virtualThread.start();
// 1 秒后中断虚拟线程
Thread.sleep(1000);
virtualThread.interrupt();
virtualThread.join();
}
}
虚拟线程提供完整的栈跟踪信息:
public class VirtualThreadStackTrace {
public static void main(String[] args) {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
methodA();
});
virtualThread.start();
try {
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void methodA() {
methodB();
}
private static void methodB() {
// 打印当前线程的栈跟踪
Thread.dumpStack();
// 或者获取栈跟踪
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
System.out.println("Stack trace size: " + stackTrace.length);
for (StackTraceElement element : stackTrace) {
System.out.println(element);
}
}
}
虚拟线程的最佳实践
适合使用虚拟线程的场景:
不适合使用虚拟线程的场景:
// 1. 正确使用 try-with-resources 管理执行器
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交任务
executor.submit(task);
} // 自动关闭,等待所有任务完成
// 2. 避免在虚拟线程中进行 CPU 密集型计算
Runnable cpuIntensiveTask = () -> {
// 错误:在虚拟线程中进行大量计算
// long result = heavyComputation();
// 正确:将 CPU 密集型任务提交到平台线程池
ExecutorService cpuPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
CompletableFuture.supplyAsync(() -> heavyComputation(), cpuPool)
.thenAccept(result -> {
// 处理结果(回到虚拟线程)
System.out.println("Result: " + result);
});
};
// 3. 合理管理资源
public class ResourceManagementExample {
private static final ExecutorService VIRTUAL_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor();
private static final ExecutorService CPU_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void shutdown() {
VIRTUAL_EXECUTOR.close();
CPU_EXECUTOR.shutdown();
}
}
public class VirtualThreadErrorHandling {
public static void main(String[] args) {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
try {
// 可能抛出异常的操作
riskyOperation();
} catch (Exception e) {
// 处理异常
System.err.println("Exception in virtual thread: " + e.getMessage());
}
});
// 设置未捕获异常处理器
virtualThread.setUncaughtExceptionHandler((thread, exception) -> {
System.err.println("Uncaught exception in " + thread.getName() + ": " + exception.getMessage());
});
virtualThread.start();
}
private static void riskyOperation() {
throw new RuntimeException("Something went wrong!");
}
}
虚拟线程的内部机制
public class CarrierThreadExample {
public static void main(String[] args) throws InterruptedException {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
System.out.println("Virtual thread: " + Thread.currentThread());
System.out.println("Carrier thread: " + Thread.currentThread().carrierThread());
// 模拟 I/O 阻塞
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("After sleep - Virtual: " + Thread.currentThread());
System.out.println("After sleep - Carrier: " + Thread.currentThread().carrierThread());
});
virtualThread.start();
virtualThread.join();
}
}
当虚拟线程遇到阻塞操作时:
与现有并发工具的集成
public class CompletableFutureWithVirtualThreads {
public static void main(String[] args) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> fetchDataFromAPI1(), executor);
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> fetchDataFromAPI2(), executor);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2);
combined.join();
System.out.println("Data 1: " + future1.get());
System.out.println("Data 2: " + future2.get());
}
}
private static String fetchDataFromAPI1() {
// 模拟 API 调用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
return "API1 Result";
}
private static String fetchDataFromAPI2() {
// 模拟 API 调用
try {
Thread.sleep(1500);
} catch (InterruptedException e) {}
return "API2 Result";
}
}
public class StreamWithVirtualThreads {
public static void main(String[] args) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 并行流通常使用 ForkJoinPool,但可以自定义
var results = IntStream.range(0, 1000)
.boxed()
.parallel()
.map(i -> processItem(i, executor))
.collect(Collectors.toList());
System.out.println("Processed " + results.size() + " items");
}
}
private static String processItem(int item, ExecutorService executor) {
// 在虚拟线程中处理每个项目
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// 模拟 I/O 操作
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
return "Processed " + item;
}, executor);
return future.join();
}
}
调试和监控虚拟线程
# 启用虚拟线程相关的调试信息
java -Djdk.virtualThreadScheduler.parallelism=8 YourApp
# 监控虚拟线程统计信息
java -XX:+UnlockDiagnosticVMOptions -XX:+LogVMOutput YourApp
import java.lang.management.ManagementFactory;
import com.sun.management.ThreadMXBean;
public class VirtualThreadMonitoring {
public static void main(String[] args) {
ThreadMXBean threadBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
// 获取线程信息
long[] threadIds = threadBean.getAllThreadIds();
System.out.println("Total threads: " + threadIds.length);
// 虚拟线程的线程 ID 通常是负数
for (long id : threadIds) {
if (id < 0) {
System.out.println("Virtual thread ID: " + id);
}
}
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VirtualThreadLogging {
private static final Logger logger = LoggerFactory.getLogger(VirtualThreadLogging.class);
public static void main(String[] args) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
// 日志中包含线程信息
logger.info("Processing task {} on thread {}", taskId, Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("Completed task {}", taskId);
});
}
}
}
}
常见问题和解决方案
// 问题:ThreadLocal 未清理
private static final ThreadLocal<ExpensiveResource> resource = new ThreadLocal<>();
public void badExample() {
Thread.ofVirtual().start(() -> {
resource.set(new ExpensiveResource()); // 未清理!
// ... do work
});
}
// 解决方案:确保清理 ThreadLocal
public void goodExample() {
Thread.ofVirtual().start(() -> {
try {
resource.set(new ExpensiveResource());
// ... do work
} finally {
resource.remove(); // 确保清理
}
});
}
虚拟线程的死锁检测与平台线程相同:
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = Thread.ofVirtual().unstarted(() -> {
synchronized (lock1) {
System.out.println("T1 got lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
synchronized (lock2) {
System.out.println("T1 got lock2");
}
}
});
Thread t2 = Thread.ofVirtual().unstarted(() -> {
synchronized (lock2) {
System.out.println("T2 got lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
synchronized (lock1) {
System.out.println("T2 got lock1");
}
}
});
t1.start();
t2.start();
// JVM 仍然可以检测到死锁
}
}
总结
虚拟线程是 Java 并发编程的重大突破,它让开发者能够以同步的方式编写高并发代码,同时获得异步编程的性能优势。掌握虚拟线程的使用方法,将极大地提升 Java 应用的并发处理能力。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online