跳到主要内容Java 21 虚拟线程:核心特性与最佳实践 | 极客日志Javajava
Java 21 虚拟线程:核心特性与最佳实践
综述由AI生成Java 21 虚拟线程是 Project Loom 的核心成果,由 JVM 管理而非操作系统。相比平台线程,虚拟线程轻量级,可创建数百万个,适合高并发 I/O 场景。内容包括虚拟线程的创建方法、核心应用场景、高级特性及内部机制。提供了性能调优建议、错误处理、监控调试方法及常见问题解决方案。掌握虚拟线程能显著提升 Java 应用的并发处理能力。
古灵精怪21 浏览 虚拟线程概述
什么是虚拟线程?
虚拟线程(Virtual Threads)是 JDK 实现的轻量级线程,由 JVM 而非操作系统管理。它们与传统的平台线程(Platform Threads,即操作系统线程)形成对比。
核心特点
- 轻量级:创建成本极低,可同时创建数百万个
- 托管式:由 JVM 调度,而非操作系统
- 兼容性:完全兼容现有的
java.lang.Thread API
- 自动调度:在阻塞操作时自动挂起,释放底层平台线程
与平台线程的对比
| 特性 | 平台线程 | 虚拟线程 |
|---|
| 创建成本 | 高(MB 级内存) | 极低(KB 级内存) |
| 最大数量 | 几千个 | 数百万个 |
| 调度者 | 操作系统 | JVM |
| 阻塞行为 | 阻塞整个 OS 线程 | 自动挂起,释放载体线程 |
| 适用场景 | CPU 密集型任务 | I/O 密集型任务 |
虚拟线程的创建方法
使用 Thread.ofVirtual()
Thread virtualThread1 = Thread.ofVirtual().start(() -> {
System.out.println("Hello from virtual thread!");
});
Thread virtualThread2 = Thread.ofVirtual().name("my-virtual-thread").unstarted(() -> {
System.out.println("Named virtual thread");
});
virtualThread2.start();
Thread virtualThread3 = Thread.ofVirtual().daemon(true).unstarted(() -> {
System.out.println("Daemon virtual thread");
});
使用 Thread.Builder
Thread. Thread.ofVirtual();
builder
.name()
.daemon()
.unstarted(() -> {
System.out.println();
});
virtualThread.start();
Builder
builder
=
Thread
virtualThread
=
"custom-virtual-thread"
false
"Custom virtual thread running"
使用 Executors.newVirtualThreadPerTaskExecutor()
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " running on " + Thread.currentThread());
return "Result " + taskId;
});
}
}
从现有线程工厂创建
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();
Thread thread = virtualThreadFactory.newThread(() -> {
System.out.println("Created via factory");
});
thread.start();
高并发 I/O 操作
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HighConcurrencyIO {
private static final HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
public static void main(String[] args) throws InterruptedException {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<?>[] futures = new CompletableFuture[10000];
for (int i = 0; i < 10000; i++) {
final int requestId = i;
futures[i] = CompletableFuture.runAsync(() -> {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/delay/1"))
.timeout(Duration.ofSeconds(5))
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println("Request " + requestId + " completed with status: " + response.statusCode());
} catch (Exception e) {
System.err.println("Request " + requestId + " failed: " + e.getMessage());
}
}, executor);
}
CompletableFuture.allOf(futures).join();
}
System.out.println("All requests completed!");
}
}
数据库连接池优化
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DatabaseVirtualThreads {
private static final String DB_URL = "jdbc:postgresql://localhost:5432/test";
private static final String DB_USER = "user";
private static final String DB_PASSWORD = "password";
public static void main(String[] args) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<?>[] futures = new CompletableFuture[1000];
for (int i = 0; i < 1000; i++) {
final int queryId = i;
futures[i] = CompletableFuture.runAsync(() -> {
Connection conn = null;
try {
conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
PreparedStatement stmt = conn.prepareStatement(
"SELECT * FROM users WHERE id = ?");
stmt.setInt(1, queryId);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
System.out.println("Query " + queryId + ": " + rs.getString("name"));
}
} catch (Exception e) {
System.err.println("Query " + queryId + " failed: " + e.getMessage());
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}, executor);
}
CompletableFuture.allOf(futures).join();
}
}
}
Web 服务器处理
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
public class VirtualThreadWebServer {
public static void main(String[] args) throws IOException {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
Executor virtualThreadExecutor = runnable -> {
Thread.ofVirtual().start(runnable);
};
server.setExecutor(virtualThreadExecutor);
server.createContext("/hello", new HttpHandler() {
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
String response = "Hello from virtual thread! Thread: " + Thread.currentThread();
exchange.sendResponseHeaders(200, response.length());
OutputStream os = exchange.getResponseBody();
os.write(response.getBytes());
os.close();
}
});
server.start();
System.out.println("Server started on http://localhost:8080/hello");
}
}
线程局部变量(ThreadLocal)
public class VirtualThreadLocalExample {
private static final ThreadLocal<String> context = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
context.set("Context for " + threadName);
System.out.println("Thread: " + threadName + ", Context: " + context.get());
context.remove();
};
Thread vt1 = Thread.ofVirtual().name("VT-1").unstarted(task);
Thread vt2 = Thread.ofVirtual().name("VT-2").unstarted(task);
vt1.start();
vt2.start();
vt1.join();
vt2.join();
}
}
线程中断
public class VirtualThreadInterruptExample {
public static void main(String[] args) throws InterruptedException {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
try {
System.out.println("Virtual thread started");
Thread.sleep(5000);
System.out.println("Virtual thread finished normally");
} catch (InterruptedException e) {
System.out.println("Virtual thread was interrupted");
Thread.currentThread().interrupt();
}
});
virtualThread.start();
Thread.sleep(1000);
virtualThread.interrupt();
virtualThread.join();
}
}
线程栈跟踪
public class VirtualThreadStackTrace {
public static void main(String[] args) {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
methodA();
});
virtualThread.start();
try {
virtualThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void methodA() {
methodB();
}
private static void methodB() {
Thread.dumpStack();
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
System.out.println("Stack trace size: " + stackTrace.length);
for (StackTraceElement element : stackTrace) {
System.out.println(element);
}
}
}
何时使用虚拟线程
- I/O 密集型任务(网络请求、文件操作、数据库查询)
- 高并发服务器应用
- 需要大量并发但每个任务较轻量的场景
- CPU 密集型任务(应使用平台线程池)
- 需要长时间持有锁的任务
- 与 JNI 交互的本地代码
性能调优建议
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(task);
}
Runnable cpuIntensiveTask = () -> {
ExecutorService cpuPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
CompletableFuture.supplyAsync(() -> heavyComputation(), cpuPool)
.thenAccept(result -> {
System.out.println("Result: " + result);
});
};
public class ResourceManagementExample {
private static final ExecutorService VIRTUAL_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor();
private static final ExecutorService CPU_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void shutdown() {
VIRTUAL_EXECUTOR.close();
CPU_EXECUTOR.shutdown();
}
}
错误处理和监控
public class VirtualThreadErrorHandling {
public static void main(String[] args) {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
try {
riskyOperation();
} catch (Exception e) {
System.err.println("Exception in virtual thread: " + e.getMessage());
}
});
virtualThread.setUncaughtExceptionHandler((thread, exception) -> {
System.err.println("Uncaught exception in " + thread.getName() + ": " + exception.getMessage());
});
virtualThread.start();
}
private static void riskyOperation() {
throw new RuntimeException("Something went wrong!");
}
}
载体线程(Carrier Threads)
public class CarrierThreadExample {
public static void main(String[] args) throws InterruptedException {
Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
System.out.println("Virtual thread: " + Thread.currentThread());
System.out.println("Carrier thread: " + Thread.currentThread().carrierThread());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("After sleep - Virtual: " + Thread.currentThread());
System.out.println("After sleep - Carrier: " + Thread.currentThread().carrierThread());
});
virtualThread.start();
virtualThread.join();
}
}
挂起和恢复机制
- JVM 捕获阻塞操作
- 虚拟线程被挂起
- 载体线程被释放用于执行其他虚拟线程
- 阻塞操作完成后,虚拟线程被调度到某个载体线程上恢复执行
CompletableFuture
public class CompletableFutureWithVirtualThreads {
public static void main(String[] args) throws Exception {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> fetchDataFromAPI1(), executor);
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> fetchDataFromAPI2(), executor);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2);
combined.join();
System.out.println("Data 1: " + future1.get());
System.out.println("Data 2: " + future2.get());
}
}
private static String fetchDataFromAPI1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
return "API1 Result";
}
private static String fetchDataFromAPI2() {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {}
return "API2 Result";
}
}
Stream API
public class StreamWithVirtualThreads {
public static void main(String[] args) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
var results = IntStream.range(0, 1000)
.boxed()
.parallel()
.map(i -> processItem(i, executor))
.collect(Collectors.toList());
System.out.println("Processed " + results.size() + " items");
}
}
private static String processItem(int item, ExecutorService executor) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
return "Processed " + item;
}, executor);
return future.join();
}
}
JVM 参数
java -Djdk.virtualThreadScheduler.parallelism=8 YourApp
java -XX:+UnlockDiagnosticVMOptions -XX:+LogVMOutput YourApp
JMX 监控
import java.lang.management.ManagementFactory;
import com.sun.management.ThreadMXBean;
public class VirtualThreadMonitoring {
public static void main(String[] args) {
ThreadMXBean threadBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.getAllThreadIds();
System.out.println("Total threads: " + threadIds.length);
for (long id : threadIds) {
if (id < 0) {
System.out.println("Virtual thread ID: " + id);
}
}
}
}
日志记录
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VirtualThreadLogging {
private static final Logger logger = LoggerFactory.getLogger(VirtualThreadLogging.class);
public static void main(String[] args) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
logger.info("Processing task {} on thread {}", taskId, Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
logger.info("Completed task {}", taskId);
});
}
}
}
}
内存泄漏问题
private static final ThreadLocal<ExpensiveResource> resource = new ThreadLocal<>();
public void badExample() {
Thread.ofVirtual().start(() -> {
resource.set(new ExpensiveResource());
});
}
public void goodExample() {
Thread.ofVirtual().start(() -> {
try {
resource.set(new ExpensiveResource());
} finally {
resource.remove();
}
});
}
死锁检测
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = Thread.ofVirtual().unstarted(() -> {
synchronized (lock1) {
System.out.println("T1 got lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
synchronized (lock2) {
System.out.println("T1 got lock2");
}
}
});
Thread t2 = Thread.ofVirtual().unstarted(() -> {
synchronized (lock2) {
System.out.println("T2 got lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
synchronized (lock1) {
System.out.println("T2 got lock1");
}
}
});
t1.start();
t2.start();
}
}
虚拟线程的优势
- 极高的并发能力:轻松支持百万级并发
- 简化的编程模型:无需复杂的异步回调
- 向后兼容:完全兼容现有 Thread API
- 自动资源管理:阻塞时自动释放载体线程
使用建议
- I/O 密集型任务:优先使用虚拟线程
- CPU 密集型任务:继续使用平台线程池
- 资源管理:注意 ThreadLocal 清理和资源释放
- 监控调试:利用现有的 JVM 工具进行监控
虚拟线程是 Java 并发编程的重大突破,它让开发者能够以同步的方式编写高并发代码,同时获得异步编程的性能优势。掌握虚拟线程的使用方法,将极大地提升 Java 应用的并发处理能力。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online