跳到主要内容Java 多线程详解 | 极客日志Javajava
Java 多线程详解
Java 多线程详解涵盖线程基础概念、创建方式(Thread、Runnable、Callable)、生命周期管理、同步机制(synchronized、Lock、volatile)、线程池配置与使用、并发工具类(CountDownLatch、CompletableFuture 等)以及最佳实践。文章通过代码示例对比不同方案优劣,讲解死锁避免、异常处理及性能优化技巧,适合希望深入理解 Java 并发编程的开发者阅读。
灵魂伴侣27 浏览 Java 多线程详解
Thread、Runnable、Callable、线程同步、线程池、并发工具类完整教程
一、线程基础
1.1 进程与线程
进程(Process)
- 定义:程序的一次执行过程,是系统进行资源分配的基本单位
- 特点:拥有独立的内存空间、代码、数据和系统资源
- 隔离性:进程之间相互独立,一个进程崩溃不会影响其他进程
线程(Thread)
- 定义:进程中的一个执行单元,是 CPU 调度的基本单位
- 特点:共享进程的内存空间和资源
- 轻量级:线程切换开销远小于进程切换
进程 vs 线程对比:
| 维度 | 进程 | 线程 |
|---|
| 资源分配 | 独立的内存空间 | 共享进程内存 |
| 通信方式 | IPC(管道、消息队列等) | 直接读写共享变量 |
| 创建开销 | 大(需要分配资源) | 小(共享资源) |
| 切换开销 | 大(需要切换地址空间) | 小(只切换上下文) |
| 影响范围 | 崩溃不影响其他进程 | 崩溃导致整个进程终止 |
| 数据共享 | 困难(需要 IPC) | 容易(共享内存) |
| 适用场景 | 独立应用程序 | 同一应用内的并发任务 |
为什么需要多线程?
- ✅ 提高程序响应速度:UI 线程不被阻塞
- ✅ 充分利用 CPU:多核 CPU 并行执行
- ✅ 提高资源利用率:IO 等待时 CPU 可执行其他任务
- ✅ 简化程序结构:将复杂任务分解为多个独立线程
1.2 并发与并行
并发(Concurrency)
- 定义:多个任务在同一时间段内交替执行
- 场景:单核 CPU 通过时间片轮转实现
- 特点:宏观上同时进行,微观上交替执行
- 示例:一个人同时处理多个任务(切换处理)
并行(Parallelism)
- 定义:多个任务在同一时刻真正同时执行
- 场景:多核 CPU 同时执行多个线程
- 特点:真正的同时执行
- 示例:多个人同时处理不同任务
并发(单核): 时间 → [任务 A] [任务 B] ... ↑ 快速切换,看起来像同时执行
并行(多核): 核心 : ...
核心 : ...
核心 : ...
↑ 真正同时执行
[任务 A]
[任务 C]
[任务 B]
1
[任务 A]
[任务 A]
[任务 A]
[任务 A]
2
[任务 B]
[任务 B]
[任务 B]
[任务 B]
3
[任务 C]
[任务 C]
[任务 C]
[任务 C]
| 维度 | 并发 | 并行 |
|---|
| 执行方式 | 交替执行 | 同时执行 |
| CPU 要求 | 单核或多核 | 必须多核 |
| 目的 | 提高响应速度 | 提高吞吐量 |
| 实现 | 时间片轮转 | 多个 CPU 核心 |
1.3 多线程的挑战
- ✅ 提高程序性能
- ✅ 改善用户体验
- ✅ 充分利用硬件资源
- ⚠️ 线程安全问题:多个线程访问共享数据可能导致数据不一致
- ⚠️ 死锁问题:多个线程相互等待对方释放资源
- ⚠️ 性能开销:线程创建、切换、同步都有开销
- ⚠️ 调试困难:并发 bug 难以重现和调试
- ⚠️ 复杂性增加:需要考虑同步、通信、资源竞争等问题
二、创建线程
2.1 继承 Thread 类
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
t1.setName("线程 1");
t2.setName("线程 2");
t1.start();
t2.start();
}
}
2.2 实现 Runnable 接口
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
}
public class Main {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
Thread t1 = new Thread(runnable, "线程 1");
Thread t2 = new Thread(runnable, "线程 2");
t1.start();
t2.start();
}
}
2.3 实现 Callable 接口
import java.util.concurrent.*;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
}
public class Main {
public static void main(String[] args) throws Exception {
MyCallable callable = new MyCallable();
FutureTask<Integer> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
thread.start();
Integer result = task.get();
System.out.println("结果:" + result);
}
}
2.4 Lambda 表达式创建
public class Main {
public static void main(String[] args) {
new Thread(() -> {
System.out.println("Lambda 线程执行");
}).start();
FutureTask<String> task = new FutureTask<>(() -> {
return "Lambda 返回值";
});
new Thread(task).start();
}
}
2.5 三种方式对比
| 方式 | 优点 | 缺点 | 使用场景 |
|---|
| 继承 Thread | 简单直接 | 不能继承其他类 | 简单任务 |
| 实现 Runnable | 可继承其他类,资源共享 | 无返回值 | 常规任务 |
| 实现 Callable | 有返回值,可抛异常 | 代码复杂 | 需要返回值 |
2.6 线程创建实战案例
import java.util.concurrent.*;
import java.util.*;
public class ThreadCreationPractice {
public static void main(String[] args) throws Exception {
int[] arr = new int[1000];
for (int i = 0; i < arr.length; i++) {
arr[i] = i + 1;
}
long sum = parallelSum(arr, 4);
System.out.println("数组和:" + sum);
List<String> files = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
processFilesInParallel(files);
String result = executeWithTimeout(() -> {
Thread.sleep(2000);
return "任务完成";
}, 3, TimeUnit.SECONDS);
System.out.println(result);
}
public static long parallelSum(int[] arr, int threadCount) throws Exception {
int chunkSize = arr.length / threadCount;
List<Future<Long>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? arr.length : (i + 1) * chunkSize;
Future<Long> future = executor.submit(() -> {
long sum = 0;
for (int j = start; j < end; j++) {
sum += arr[j];
}
return sum;
});
futures.add(future);
}
long totalSum = 0;
for (Future<Long> future : futures) {
totalSum += future.get();
}
executor.shutdown();
return totalSum;
}
public static void processFilesInParallel(List<String> files) {
List<Thread> threads = new ArrayList<>();
for (String file : files) {
Thread thread = new Thread(() -> {
System.out.println("处理文件:" + file + ", 线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "FileProcessor-" + file);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("所有文件处理完成");
}
public static <T> T executeWithTimeout(Callable<T> task, long timeout, TimeUnit unit) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(task);
try {
return future.get(timeout, unit);
} catch (TimeoutException e) {
future.cancel(true);
throw new RuntimeException("任务超时");
} finally {
executor.shutdown();
}
}
}
三、线程生命周期
3.1 线程状态
NEW(新建) ↓ start()
RUNNABLE(就绪/运行) ↓ 获取锁失败 / wait() / sleep() / join()
BLOCKED(阻塞)/ WAITING(等待)/ TIMED_WAITING(超时等待) ↓ 获取锁 / notify() / 超时
RUNNABLE ↓ run() 执行完毕
TERMINATED(终止)
3.2 线程常用方法
public class ThreadMethodDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("线程执行中");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
});
thread.start();
System.out.println("线程名:" + thread.getName());
System.out.println("状态:" + thread.getState());
thread.setPriority(Thread.MAX_PRIORITY);
System.out.println("存活:" + thread.isAlive());
thread.join();
System.out.println("主线程结束");
}
}
3.3 sleep vs wait
| 特性 | sleep() | wait() |
|---|
| 所属类 | Thread | Object |
| 释放锁 | 不释放 | 释放 |
| 使用位置 | 任何地方 | 同步代码块/方法 |
| 唤醒方式 | 自动(超时) | notify()/notifyAll() |
四、线程同步
4.1 线程安全问题
public class UnsafeDemo {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++;
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count++;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("count: " + count);
}
}
4.2 synchronized 关键字
public class SafeCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
public class SafeDemo {
private int count = 0;
private Object lock = new Object();
public void increment() {
synchronized (lock) {
count++;
}
}
public void decrement() {
synchronized (this) {
count--;
}
}
}
public class StaticSync {
private static int count = 0;
public static synchronized void increment() {
count++;
}
}
4.3 Lock 接口
import java.util.concurrent.locks.*;
public class LockDemo {
private int count = 0;
private Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public void tryIncrement() {
if (lock.tryLock()) {
try {
count++;
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败");
}
}
}
4.4 ReadWriteLock
import java.util.concurrent.locks.*;
public class ReadWriteLockDemo {
private int value = 0;
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
public int read() {
rwLock.readLock().lock();
try {
return value;
} finally {
rwLock.readLock().unlock();
}
}
public void write(int value) {
rwLock.writeLock().lock();
try {
this.value = value;
} finally {
rwLock.writeLock().unlock();
}
}
}
4.5 volatile 关键字
public class VolatileDemo {
private volatile boolean flag = false;
public void writer() {
flag = true;
}
public void reader() {
while (!flag) {
}
System.out.println("flag 已改变");
}
}
- ✅ 保证可见性:一个线程修改后,其他线程立即可见
- ✅ 禁止指令重排序
- ❌ 不保证原子性:
count++ 仍然不是线程安全的
volatile vs synchronized:
| 特性 | volatile | synchronized |
|---|
| 原子性 | ❌ 不保证 | ✅ 保证 |
| 可见性 | ✅ 保证 | ✅ 保证 |
| 有序性 | ✅ 保证 | ✅ 保证 |
| 性能 | 高(无锁) | 低(加锁) |
| 使用场景 | 状态标志 | 复合操作 |
4.6 原子类
import java.util.concurrent.atomic.*;
public class AtomicDemo {
private AtomicInteger count = new AtomicInteger(0);
private AtomicLong longValue = new AtomicLong(0);
private AtomicBoolean flag = new AtomicBoolean(false);
public void increment() {
count.incrementAndGet();
}
public void decrement() {
count.decrementAndGet();
}
public void addValue(int delta) {
count.addAndGet(delta);
}
public boolean compareAndSet(int expect, int update) {
return count.compareAndSet(expect, update);
}
public static void main(String[] args) throws InterruptedException {
AtomicDemo demo = new AtomicDemo();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果:" + demo.count.get());
}
}
| 类型 | 说明 | 示例 |
|---|
AtomicInteger | 原子整数 | count.incrementAndGet() |
AtomicLong | 原子长整数 | value.addAndGet(10) |
AtomicBoolean | 原子布尔 | flag.compareAndSet(false, true) |
AtomicReference<T> | 原子引用 | ref.set(new Object) |
LongAdder | 高性能计数器 | adder.increment() |
4.7 死锁问题
public class DeadlockDemo {
private static Object lock1 = new Object();
private static Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("线程 1 获取 lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
synchronized (lock2) {
System.out.println("线程 1 获取 lock2");
}
});
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("线程 2 获取 lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
synchronized (lock1) {
System.out.println("线程 2 获取 lock1");
}
});
});
t1.start();
t2.start();
}
}
synchronized (lock1) {
synchronized (lock2) {
}
}
Lock lock1 = new ReentrantLock();
Lock lock2 = new ReentrantLock();
if (lock1.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
if (lock2.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
- 使用 Lock 的 lockInterruptibly
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
4.8 线程安全的单例模式
public class Singleton1 {
private static final Singleton1 INSTANCE = new Singleton1();
private Singleton1() {}
public static Singleton1 getInstance() {
return INSTANCE;
}
}
public class Singleton2 {
private static volatile Singleton2 instance;
private Singleton2() {}
public static Singleton2 getInstance() {
if (instance == null) {
synchronized (Singleton2.class) {
if (instance == null) {
instance = new Singleton2();
}
}
}
return instance;
}
}
public class Singleton3 {
private Singleton3() {}
private static class Holder {
private static final Singleton3 INSTANCE = new Singleton3();
}
public static Singleton3 getInstance() {
return Holder.INSTANCE;
}
}
public enum Singleton4 {
INSTANCE;
public void doSomething() {
System.out.println("单例方法");
}
}
五、线程通信
5.1 wait/notify
public class WaitNotifyDemo {
private static Object lock = new Object();
private static boolean flag = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!flag) {
try {
System.out.println("t1 等待");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t1 继续执行");
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
flag = true;
System.out.println("t2 通知");
lock.notify();
}
});
t1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
t2.start();
}
}
5.2 生产者消费者模式
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumer {
private Queue<Integer> queue = new LinkedList<>();
private int maxSize = 10;
public synchronized void produce(int value) throws InterruptedException {
while (queue.size() == maxSize) {
wait();
}
queue.offer(value);
System.out.println("生产:" + value + ", 队列大小:" + queue.size());
notifyAll();
}
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
int value = queue.poll();
System.out.println("消费:" + value + ", 队列大小:" + queue.size());
notifyAll();
return value;
}
}
public class Main {
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
pc.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
pc.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
六、线程池
6.1 为什么使用线程池
- 降低资源消耗(复用线程)
- 提高响应速度(无需创建线程)
- 提高线程可管理性
6.2 创建线程池
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
ExecutorService singlePool = Executors.newSingleThreadExecutor();
ExecutorService cachedPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
fixedPool.execute(() -> {
System.out.println("任务执行");
});
fixedPool.shutdown();
}
}
6.3 自定义线程池
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("任务" + taskId + "执行,线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
6.4 线程池参数
| 参数 | 说明 |
|---|
| corePoolSize | 核心线程数 |
| maximumPoolSize | 最大线程数 |
| keepAliveTime | 空闲线程存活时间 |
| unit | 时间单位 |
| workQueue | 任务队列 |
| threadFactory | 线程工厂 |
| handler | 拒绝策略 |
6.5 拒绝策略
new ThreadPoolExecutor.AbortPolicy()
new ThreadPoolExecutor.CallerRunsPolicy()
new ThreadPoolExecutor.DiscardPolicy()
new ThreadPoolExecutor.DiscardOldestPolicy()
6.6 实战案例:批量下载
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class DownloadManager {
private ExecutorService executor;
public DownloadManager(int threadCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
}
public void downloadFiles(List<String> urls) {
List<Future<Boolean>> futures = new ArrayList<>();
for (String url : urls) {
Future<Boolean> future = executor.submit(() -> {
return downloadFile(url);
});
futures.add(future);
}
for (int i = 0; i < futures.size(); i++) {
try {
boolean success = futures.get(i).get();
System.out.println("文件" + i + "下载" + (success ? "成功" : "失败"));
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
}
private boolean downloadFile(String url) {
System.out.println("下载:" + url + ", 线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return false;
}
return true;
}
}
public class Main {
public static void main(String[] args) {
List<String> urls = new ArrayList<>();
for (int i = 0; i < 10; i++) {
urls.add("http://example.com/file" + i + ".zip");
}
DownloadManager manager = new DownloadManager(3);
manager.downloadFiles(urls);
}
}
七、并发工具类
7.1 CountDownLatch
import java.util.concurrent.*;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
System.out.println("任务" + taskId + "开始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务" + taskId + "完成");
latch.countDown();
}).start();
}
System.out.println("等待所有任务完成...");
latch.await();
System.out.println("所有任务已完成");
}
}
- 主线程等待多个子线程完成
- 多个线程等待某个条件满足后同时开始
7.2 CyclicBarrier
让一组线程到达屏障点时被阻塞,直到最后一个线程到达。
import java.util.concurrent.*;
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程已到达屏障,开始下一阶段");
});
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("线程" + taskId + "准备中...");
Thread.sleep((taskId + 1) * 1000);
System.out.println("线程" + taskId + "已就绪");
barrier.await();
System.out.println("线程" + taskId + "继续执行");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
CountDownLatch vs CyclicBarrier:
| 特性 | CountDownLatch | CyclicBarrier |
|---|
| 可重用 | ❌ 不可重用 | ✅ 可重用 |
| 等待方式 | 一个或多个线程等待 | 所有线程互相等待 |
| 计数方式 | 递减到 0 | 递增到 N |
| 使用场景 | 主线程等待子线程 | 多线程协同工作 |
7.3 Semaphore
import java.util.concurrent.*;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程" + taskId + "获得许可,开始执行");
Thread.sleep(2000);
System.out.println("线程" + taskId + "执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
- 限流:控制并发访问数量
- 资源池:管理有限资源(如数据库连接池)
7.4 并发集合
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
map.putIfAbsent("key2", 2);
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("task1");
String task = queue.poll();
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(10);
try {
blockingQueue.put("item");
String item = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
| 场景 | 推荐集合 | 说明 |
|---|
| 高并发 Map | ConcurrentHashMap | 分段锁,性能好 |
| 读多写少 List | CopyOnWriteArrayList | 写时复制 |
| 无界队列 | ConcurrentLinkedQueue | 非阻塞 |
| 有界队列 | LinkedBlockingQueue | 阻塞 |
| 优先级队列 | PriorityBlockingQueue | 按优先级 |
| 延迟队列 | DelayQueue | 延迟获取 |
7.5 CompletableFuture
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("异步任务执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "结果 1";
});
future1.thenAccept(result -> {
System.out.println("任务完成,结果:" + result);
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 10;
}).thenApply(num -> {
return num * 2;
}).thenApply(num -> {
return num + 5;
});
System.out.println("链式结果:" + future2.get());
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "任务 1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return "任务 2";
});
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
allTasks.get();
System.out.println("所有任务完成");
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1, task2);
System.out.println("最快完成的任务:" + anyTask.get());
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("任务失败");
}
return "成功";
}).exceptionally(ex -> {
System.out.println("捕获异常:" + ex.getMessage());
return "默认值";
});
System.out.println("结果:" + future3.get());
}
}
| 方法 | 说明 | 示例 |
|---|
supplyAsync() | 异步执行有返回值任务 | supplyAsync(() -> "result") |
runAsync() | 异步执行无返回值任务 | runAsync(() -> {...}) |
thenApply() | 转换结果 | thenApply(x -> x * 2) |
thenAccept() | 消费结果 | thenAccept(System.out::println) |
thenCompose() | 组合 Future | thenCompose(x -> anotherFuture) |
allOf() | 等待所有完成 | allOf(f1, f2, f3) |
anyOf() | 等待任意完成 | anyOf(f1, f2, f3) |
exceptionally() | 异常处理 | exceptionally(ex -> defaultValue) |
7.6 实战:并行处理大数据
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelProcessingDemo {
public static void main(String[] args) throws Exception {
List<Integer> data = IntStream.range(0, 1000000).boxed().collect(Collectors.toList());
long start1 = System.currentTimeMillis();
int result1 = processWithThreadPool(data);
long end1 = System.currentTimeMillis();
System.out.println("线程池处理耗时:" + (end1 - start1) + "ms, 结果:" + result1);
long start2 = System.currentTimeMillis();
int result2 = data.parallelStream().filter(n -> n % 2 == 0).mapToInt(n -> n * 2).sum();
long end2 = System.currentTimeMillis();
System.out.println("并行流处理耗时:" + (end2 - start2) + "ms, 结果:" + result2);
long start3 = System.currentTimeMillis();
int result3 = processWithCompletableFuture(data);
long end3 = System.currentTimeMillis();
System.out.println("CompletableFuture 处理耗时:" + (end3 - start3) + "ms, 结果:" + result3);
}
private static int processWithThreadPool(List<Integer> data) throws Exception {
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
int chunkSize = data.size() / threadCount;
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;
Future<Integer> future = executor.submit(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
int n = data.get(j);
if (n % 2 == 0) {
sum += n * 2;
}
}
return sum;
});
futures.add(future);
}
int totalSum = 0;
for (Future<Integer> future : futures) {
totalSum += future.get();
}
executor.shutdown();
return totalSum;
}
private static int processWithCompletableFuture(List<Integer> data) throws Exception {
int threadCount = Runtime.getRuntime().availableProcessors();
int chunkSize = data.size() / threadCount;
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.size() : (i + 1) * chunkSize;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int sum = 0;
for (int j = start; j < end; j++) {
int n = data.get(j);
if (n % 2 == 0) {
sum += n * 2;
}
}
return sum;
});
futures.add(future);
}
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.get();
return futures.stream().mapToInt(f -> {
try {
return f.get();
} catch (Exception e) {
return 0;
}
}).sum();
}
}
八、最佳实践
8.1 线程池配置
import java.util.concurrent.*;
public class ThreadPoolBestPractice {
ExecutorService bad1 = Executors.newFixedThreadPool(10);
ExecutorService bad2 = Executors.newCachedThreadPool();
ThreadPoolExecutor good = new ThreadPoolExecutor(
4,
8,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("MyPool-" + count++);
thread.setUncaughtExceptionHandler((t, e) -> {
System.err.println("线程" + t.getName() + "异常:" + e.getMessage());
});
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
| 任务类型 | 推荐大小 | 说明 |
|---|
| CPU 密集型 | N + 1 | N = CPU 核心数 |
| IO 密集型 | 2N | 或 N / (1 - 阻塞系数) |
| 混合型 | 根据实际测试调整 | 监控 CPU 和 IO 使用率 |
int cpuCount = Runtime.getRuntime().availableProcessors();
int cpuIntensivePoolSize = cpuCount + 1;
int ioIntensivePoolSize = cpuCount * 2;
8.2 异常处理
import java.util.concurrent.*;
public class ExceptionHandlingDemo {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
int result = 10 / 0;
} catch (Exception e) {
System.err.println("捕获异常:" + e.getMessage());
}
});
Thread t2 = new Thread(() -> {
int result = 10 / 0;
});
t2.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("线程" + thread.getName() + "异常:" + throwable.getMessage());
});
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.err.println("任务执行异常:" + t.getMessage());
}
}
};
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> future = pool.submit(() -> {
return 10 / 0;
});
try {
future.get();
} catch (ExecutionException e) {
System.err.println("任务异常:" + e.getCause().getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.start();
t2.start();
executor.shutdown();
pool.shutdown();
}
}
8.3 避免常见陷阱
public class CommonPitfalls {
public void pitfall1() {
Thread thread = new Thread(() -> System.out.println("执行"));
thread.run();
thread.start();
}
private Object lock = new Object();
public void pitfall2() {
synchronized (lock) {
if (someCondition()) {
return;
}
}
}
public void pitfall3() {
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
}).start();
}
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
});
}
executor.shutdown();
}
private int count = 0;
public void pitfall4() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
count++;
}).start();
}
private AtomicInteger safeCount = new AtomicInteger(0);
safeCount.incrementAndGet();
}
public void pitfall5() {
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.execute(() -> {
});
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
private boolean someCondition() {
return true;
}
}
8.4 性能优化技巧
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class PerformanceOptimization {
private static ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
public String formatDate(Date date) {
return dateFormat.get().format(date);
}
private LongAdder counter = new LongAdder();
public void increment() {
counter.increment();
}
public long getCount() {
return counter.sum();
}
private Object lock1 = new Object();
private Object lock2 = new Object();
public void fineLock() {
synchronized (lock1) {
}
synchronized (lock2) {
}
}
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private Map<String, String> cache = new HashMap<>();
public String read(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void write(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public void batchProcess() {
List<String> batch = new ArrayList<>();
queue.drainTo(batch, 100);
for (String item : batch) {
}
}
}
8.5 监控和调试
import java.lang.management.*;
import java.util.concurrent.*;
public class ThreadMonitoring {
public static void main(String[] args) {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
System.out.println("当前线程数:" + threadMXBean.getThreadCount());
System.out.println("峰值线程数:" + threadMXBean.getPeakThreadCount());
System.out.println("总启动线程数:" + threadMXBean.getTotalStartedThreadCount());
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
System.out.println("检测到死锁,涉及线程数:" + deadlockedThreads.length);
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads);
for (ThreadInfo info : threadInfos) {
System.out.println("死锁线程:" + info.getThreadName());
}
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
System.out.println("活跃线程数:" + executor.getActiveCount());
System.out.println("核心线程数:" + executor.getCorePoolSize());
System.out.println("最大线程数:" + executor.getMaximumPoolSize());
System.out.println("当前线程数:" + executor.getPoolSize());
System.out.println("队列大小:" + executor.getQueue().size());
System.out.println("已完成任务数:" + executor.getCompletedTaskCount());
System.out.println("总任务数:" + executor.getTaskCount());
Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : allThreads.entrySet()) {
Thread thread = entry.getKey();
System.out.println("\n线程:" + thread.getName() + ", 状态:" + thread.getState());
for (StackTraceElement element : entry.getValue()) {
System.out.println(" " + element);
}
}
executor.shutdown();
}
}
九、快速参考
线程创建选择
需要返回值? ├─ 是 → Callable └─ 否 → 需要继承其他类? ├─ 是 → Runnable └─ 否 → Thread 或 Runnable
同步方式选择
| 场景 | 推荐方式 | 说明 |
|---|
| 简单同步 | synchronized | 简单易用 |
| 需要尝试获取锁 | Lock.tryLock() | 可超时 |
| 读多写少 | ReadWriteLock | 读并发 |
| 可见性保证 | volatile | 轻量级 |
| 原子操作 | AtomicXxx | 无锁 |
| 高并发计数 | LongAdder | 性能最好 |
线程池选择
| 场景 | 线程池类型 | 说明 |
|---|
| 固定任务数 | FixedThreadPool | 固定线程数 |
| 单线程顺序执行 | SingleThreadExecutor | 顺序执行 |
| 大量短任务 | CachedThreadPool | 动态扩展 |
| 定时任务 | ScheduledThreadPool | 定时调度 |
| 自定义需求 | ThreadPoolExecutor | 完全控制 |
并发工具选择
| 需求 | 工具类 | 使用场景 |
|---|
| 等待多个线程完成 | CountDownLatch | 主线程等待 |
| 多线程协同 | CyclicBarrier | 互相等待 |
| 限流 | Semaphore | 控制并发数 |
| 线程安全 Map | ConcurrentHashMap | 高并发读写 |
| 线程安全 List | CopyOnWriteArrayList | 读多写少 |
| 阻塞队列 | BlockingQueue | 生产消费 |
| 异步编程 | CompletableFuture | 链式调用 |
常用方法速查
thread.start()
thread.join()
thread.interrupt()
Thread.sleep(millis)
Thread.yield()
thread.setName(name)
thread.setPriority(priority)
thread.isAlive()
thread.getState()
lock.lock()
lock.unlock()
lock.tryLock()
lock.tryLock(time, unit)
lock.lockInterruptibly()
executor.execute(runnable)
executor.submit(callable)
executor.shutdown()
executor.shutdownNow()
executor.awaitTermination()
最佳实践清单
- 优先使用线程池而非直接创建线程
- 给线程和线程池命名,便于调试
- 使用有界队列防止内存溢出
- 合理设置线程池大小(CPU 密集型:N+1,IO 密集型:2N)
- 使用
volatile 保证可见性
- 使用原子类实现线程安全计数
- 使用
ThreadLocal 避免共享状态
- 捕获线程异常,避免线程终止
- 使用
try-finally 确保锁释放
- 优先使用 JUC 工具类而非自己实现
- 不要使用
Executors 创建线程池(可能 OOM)
- 不要在循环中创建大量线程
- 不要忘记关闭线程池
- 不要直接调用
run() 方法
- 不要在持有锁时执行耗时操作
- 不要使用
stop()、suspend()、resume()(已废弃)
- 不要忽略
InterruptedException
- 不要在
finally 块中使用 return
十、总结
本文档全面介绍了 Java 多线程编程的核心知识:
1. 线程基础
- 进程与线程的区别
- 并发与并行的概念
- 多线程的优势和挑战
2. 创建线程
- 继承 Thread 类
- 实现 Runnable 接口
- 实现 Callable 接口
- Lambda 表达式创建
- 实战案例:并行计算、文件处理、超时控制
3. 线程生命周期
- 6 种线程状态
- 常用方法(start、join、sleep、interrupt)
- sleep vs wait 的区别
4. 线程同步
- synchronized 关键字(方法、代码块、静态)
- Lock 接口(ReentrantLock、ReadWriteLock)
- volatile 关键字
- 原子类(AtomicInteger、LongAdder)
- 死锁问题及解决方案
- 线程安全的单例模式
5. 线程通信
6. 线程池
- 为什么使用线程池
- 4 种预定义线程池
- 自定义线程池(ThreadPoolExecutor)
- 线程池参数和拒绝策略
- 实战案例:批量下载
7. 并发工具类
- CountDownLatch:等待多个线程完成
- CyclicBarrier:多线程协同
- Semaphore:限流控制
- 并发集合(ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue)
- CompletableFuture:异步编程
- 实战案例:并行处理大数据
8. 最佳实践
- 线程池配置和大小计算
- 异常处理(4 种方式)
- 避免常见陷阱(5 个典型错误)
- 性能优化技巧(ThreadLocal、LongAdder、读写锁)
- 监控和调试(死锁检测、线程池监控)
关键要点
| 主题 | 核心内容 |
|---|
| 线程创建 | 优先使用 Runnable/Callable,避免继承 Thread |
| 线程安全 | 使用 synchronized、Lock、原子类保证安全 |
| 线程池 | 手动创建 ThreadPoolExecutor,避免 Executors |
| 并发工具 | 善用 JUC 工具类,不要重复造轮子 |
| 性能优化 | ThreadLocal、LongAdder、读写锁、批量操作 |
| 异常处理 | 必须捕获异常,避免线程意外终止 |
| 资源管理 | 及时关闭线程池,使用 try-finally 释放锁 |
学习路径
- 入门阶段:掌握线程创建、生命周期、基本同步
- 进阶阶段:理解线程池、Lock、原子类、并发集合
- 高级阶段:掌握 CompletableFuture、性能优化、问题排查
- 实战阶段:在项目中应用,积累经验
推荐资源
下一步
- 深入学习 JVM 内存模型(JMM)
- 了解 happens-before 原则
- 掌握无锁编程(CAS)
- 学习分布式并发(分布式锁、消息队列)
- 实践高并发系统设计
记住:并发编程是一门实践的艺术,需要在实际项目中不断积累经验。从简单开始,逐步深入,注重代码质量和性能优化。
相关免费在线工具
- Keycode 信息
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
- Escape 与 Native 编解码
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
- JavaScript / HTML 格式化
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
- JavaScript 压缩与混淆
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online