JAVA多线程并发编程:并发容器与线程协作实战
JAVA多线程并发编程:并发容器与线程协作实战
💡 学习目标:掌握JAVA中常用并发容器的特性与适用场景,理解线程间协作的核心原理,能够运用并发容器和协作工具解决实际并发问题。
💡 学习重点:并发容器与普通容器的区别、ConcurrentHashMap 核心原理、CountDownLatch/CyclicBarrier/Semaphore 的使用、生产者消费者模式实现。
1.1 为什么需要并发容器?
在多线程场景下,普通的集合容器(如 HashMap、ArrayList)是线程不安全的。多个线程同时对其进行读写操作时,会导致数据错乱、ConcurrentModificationException 异常等问题。
⚠️ 注意事项:即使使用 Collections.synchronizedXXX() 方法包装普通容器,也只是通过 synchronized 实现简单的加锁。这种方式锁粒度较粗,并发性能较低。
✅ 核心结论:并发容器是JAVA为多线程场景设计的高性能容器。它们通过细粒度锁或无锁算法实现线程安全,能够在保证数据一致性的同时,大幅提升并发访问效率。
1.2 常用并发容器详解
1.2.1 ConcurrentHashMap:高效并发哈希表
ConcurrentHashMap 是 HashMap 的并发安全版本,是日常开发中使用频率最高的并发容器。
1.2.1.1 核心特点
- 分段锁(JDK1.7)→ CAS + 同步锁(JDK1.8)
- JDK1.7采用分段锁机制,将数据分成多个Segment。每个Segment独立加锁,不同Segment的操作互不阻塞。
- JDK1.8抛弃分段锁,采用 CAS +
synchronized实现。锁粒度缩小到单个Node节点,并发性能进一步提升。
- 支持高并发读写:读操作无锁(通过
volatile保证可见性),写操作仅锁定当前节点,不会阻塞其他操作。 - 不允许
null键值:与HashMap不同,ConcurrentHashMap的键和值都不能为null,避免歧义。
1.2.1.2 核心方法使用示例
importjava.util.concurrent.ConcurrentHashMap;/** * ConcurrentHashMap 实战示例 */publicclassConcurrentHashMapDemo{publicstaticvoidmain(String[] args)throwsInterruptedException{ConcurrentHashMap<String,Integer> map =newConcurrentHashMap<>();// 1. 插入数据 map.put("apple",10); map.put("banana",20); map.put("orange",30);// 2. 并发修改数据Thread thread1 =newThread(()->{for(int i =0; i <5; i++){// 原子性操作:获取并增加 map.computeIfPresent("apple",(k, v)-> v +1);System.out.println(Thread.currentThread().getName()+" : apple = "+ map.get("apple"));}},"线程1");Thread thread2 =newThread(()->{for(int i =0; i <5; i++){ map.computeIfPresent("apple",(k, v)-> v +1);System.out.println(Thread.currentThread().getName()+" : apple = "+ map.get("apple"));}},"线程2"); thread1.start(); thread2.start(); thread1.join(); thread2.join();// 3. 遍历数据 map.forEach((k, v)->System.out.println(k +" : "+ v));}}1.2.1.3 适用场景
- 高并发下的键值对存储场景,如缓存、用户会话存储。
- 读多写少的业务场景,能充分发挥其无锁读的性能优势。
1.2.2 CopyOnWriteArrayList:写时复制数组
CopyOnWriteArrayList 是 ArrayList 的并发安全版本,核心思想是写时复制。
1.2.2.1 核心原理
① 📝 写操作:当执行添加、删除、修改操作时,会复制一份新的数组。在新数组上完成操作后,再将原数组的引用指向新数组。
② 🔍 读操作:直接读取原数组,无需加锁,保证了读操作的高效性。
③ ⚠️ 数据一致性:写操作是原子性的,读操作可能读取到旧数据。该容器适用于读多写少的场景。
1.2.2.2 使用示例
importjava.util.Iterator;importjava.util.concurrent.CopyOnWriteArrayList;/** * CopyOnWriteArrayList 实战示例 */publicclassCopyOnWriteArrayListDemo{publicstaticvoidmain(String[] args)throwsInterruptedException{CopyOnWriteArrayList<String> list =newCopyOnWriteArrayList<>(); list.add("Java"); list.add("Python"); list.add("Go");// 1. 并发遍历与添加Thread writeThread =newThread(()->{ list.add("C++");System.out.println(Thread.currentThread().getName()+" 添加元素:C++");},"写线程");Thread readThread =newThread(()->{Iterator<String> iterator = list.iterator();while(iterator.hasNext()){System.out.println(Thread.currentThread().getName()+" 遍历元素:"+ iterator.next());// 模拟延迟try{Thread.sleep(100);}catch(InterruptedException e){ e.printStackTrace();}}},"读线程"); readThread.start();Thread.sleep(50); writeThread.start(); readThread.join(); writeThread.join();// 2. 最终遍历结果System.out.println("最终集合元素:"+ list);}}1.2.2.3 适用场景
- 读操作远多于写操作的场景,如系统配置读取、日志记录列表。
- 不要求数据实时一致性的场景,允许读取到旧数据。
1.2.3 BlockingQueue:阻塞队列
BlockingQueue 是一个支持阻塞操作的队列,是实现生产者消费者模式的核心工具。
1.2.3.1 核心特性
- 入队阻塞:当队列已满时,入队操作会阻塞,直到队列有空闲空间。
- 出队阻塞:当队列为空时,出队操作会阻塞,直到队列中有元素。
- 常用实现类:
ArrayBlockingQueue(数组实现,有界)、LinkedBlockingQueue(链表实现,可选有界)、SynchronousQueue(同步队列,无容量)。
1.2.3.2 核心方法对比
| 方法类型 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
|---|---|---|---|---|
| 入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 出队 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | - | - |
1.2.3.3 使用示例
importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;/** * BlockingQueue 实战示例 */publicclassBlockingQueueDemo{// 定义有界阻塞队列,容量为3privatestaticfinalBlockingQueue<String> queue =newArrayBlockingQueue<>(3);publicstaticvoidmain(String[] args){// 生产者线程newThread(()->{String[] products ={"产品A","产品B","产品C","产品D"};for(String product : products){try{System.out.println(Thread.currentThread().getName()+" 生产:"+ product); queue.put(product);Thread.sleep(500);}catch(InterruptedException e){ e.printStackTrace();}}},"生产者").start();// 消费者线程newThread(()->{while(true){try{String product = queue.take();System.out.println(Thread.currentThread().getName()+" 消费:"+ product);Thread.sleep(1000);}catch(InterruptedException e){ e.printStackTrace();}}},"消费者").start();}}1.3 线程协作工具类
在复杂的并发场景中,需要多个线程协同完成任务。JAVA提供了 CountDownLatch、CyclicBarrier、Semaphore 等工具类,简化线程协作的开发。
1.3.1 CountDownLatch:倒计时门闩
CountDownLatch 允许一个或多个线程等待其他线程完成操作后,再继续执行。
1.3.1.1 核心原理
- 初始化时指定计数器值,该值代表需要等待的线程数量。
- 每个线程完成任务后,调用
countDown()方法,计数器值减1。 - 主线程调用
await()方法,会阻塞直到计数器值变为0。 - 计数器值不可重置,
CountDownLatch只能使用一次。
1.3.1.2 实战案例:多线程任务汇总
importjava.util.concurrent.CountDownLatch;/** * CountDownLatch 实战:多线程数据统计 */publicclassCountDownLatchDemo{// 定义计数器,需要等待3个任务完成privatestaticfinalCountDownLatch latch =newCountDownLatch(3);publicstaticvoidmain(String[] args)throwsInterruptedException{System.out.println("主线程:开始执行数据统计任务");// 任务1:用户数据统计newThread(()->{try{Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+":用户数据统计完成");}catch(InterruptedException e){ e.printStackTrace();}finally{ latch.countDown();}},"任务线程1").start();// 任务2:订单数据统计newThread(()->{try{Thread.sleep(1500);System.out.println(Thread.currentThread().getName()+":订单数据统计完成");}catch(InterruptedException e){ e.printStackTrace();}finally{ latch.countDown();}},"任务线程2").start();// 任务3:商品数据统计newThread(()->{try{Thread.sleep(2000);System.out.println(Thread.currentThread().getName()+":商品数据统计完成");}catch(InterruptedException e){ e.printStackTrace();}finally{ latch.countDown();}},"任务线程3").start();// 等待所有任务完成 latch.await();System.out.println("主线程:所有数据统计完成,生成汇总报表");}}1.3.1.3 适用场景
- 主线程等待多个子线程完成初始化任务,如系统启动时加载配置、连接资源。
- 批量任务执行场景,需要等待所有任务完成后进行结果汇总。
1.3.2 CyclicBarrier:循环栅栏
CyclicBarrier 允许一组线程相互等待,直到所有线程都到达某个屏障点后,再继续执行。
1.3.2.1 核心原理
- 初始化时指定参与线程数量和屏障动作。屏障动作是所有线程到达后执行的任务。
- 每个线程到达屏障点时,调用
await()方法,会阻塞直到所有线程都到达。 - 所有线程到达后,执行屏障动作,然后重置计数器。
CyclicBarrier可以重复使用。
1.3.2.2 实战案例:多线程数据分片处理
importjava.util.concurrent.CyclicBarrier;/** * CyclicBarrier 实战:数据分片处理 */publicclassCyclicBarrierDemo{// 定义循环栅栏,4个线程参与,所有线程到达后执行屏障动作privatestaticfinalCyclicBarrier barrier =newCyclicBarrier(4,()->{System.out.println("屏障动作:所有分片数据处理完成,开始合并结果");});publicstaticvoidmain(String[] args){// 模拟4个数据分片for(int i =0; i <4; i++){int shard = i +1;newThread(()->{try{System.out.println(Thread.currentThread().getName()+":处理分片"+ shard +"数据");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+":分片"+ shard +"处理完成,等待其他线程");// 到达屏障点 barrier.await();System.out.println(Thread.currentThread().getName()+":结果合并完成,继续后续任务");}catch(Exception e){ e.printStackTrace();}},"分片线程"+ shard).start();}}}1.3.2.3 CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数器重置 | 不可重置,只能用一次 | 可重置,可重复使用 |
| 等待方向 | 一个或多个线程等待其他线程 | 多个线程相互等待 |
| 核心场景 | 主线程等待子线程完成 | 线程组协同完成任务 |
1.3.3 Semaphore:信号量
Semaphore 用于控制同时访问特定资源的线程数量,通过许可证机制实现资源限流。
1.3.3.1 核心原理
- 初始化时指定许可证数量,代表允许同时访问资源的线程数。
- 线程访问资源前,调用
acquire()方法获取许可证。无可用许可证时,线程会阻塞。 - 线程释放资源后,调用
release()方法归还许可证。 - 许可证数量可以动态调整,支持公平/非公平模式。
1.3.3.2 实战案例:接口限流
importjava.util.concurrent.Semaphore;/** * Semaphore 实战:接口限流 */publicclassSemaphoreDemo{// 定义信号量,允许3个线程同时访问privatestaticfinalSemaphore semaphore =newSemaphore(3);// 模拟接口方法publicstaticvoidapiInvoke(String threadName)throwsInterruptedException{// 获取许可证 semaphore.acquire();try{System.out.println(threadName +":获取许可证,开始调用接口");Thread.sleep(1000);System.out.println(threadName +":接口调用完成");}finally{// 归还许可证 semaphore.release();System.out.println(threadName +":归还许可证,当前可用许可证:"+ semaphore.availablePermits());}}publicstaticvoidmain(String[] args){// 模拟10个线程并发调用接口for(int i =0; i <10; i++){int finalI = i;newThread(()->{try{apiInvoke("线程"+ finalI);}catch(InterruptedException e){ e.printStackTrace();}}).start();}}}1.3.3.3 适用场景
- 接口限流,控制并发访问数,防止系统过载。
- 资源池访问控制,如数据库连接池、线程池的资源分配。
1.4 实战案例:基于并发容器实现生产者消费者模式
生产者消费者模式是并发编程中的经典模式。它通过阻塞队列解耦生产者和消费者,平衡生产和消费速度。
1.4.1 需求分析
- 生产者线程生产商品,将商品放入阻塞队列。
- 消费者线程从阻塞队列中取出商品进行消费。
- 当队列满时,生产者阻塞;当队列空时,消费者阻塞。
- 支持多个生产者和多个消费者并发执行。
1.4.2 代码实现
importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.TimeUnit;/** * 实战:基于 BlockingQueue 实现生产者消费者模式 */publicclassProducerConsumerPattern{// 定义有界阻塞队列,容量为5privatestaticfinalBlockingQueue<Product> queue =newArrayBlockingQueue<>(5);// 商品实体类staticclassProduct{privateString id;privateString name;publicProduct(String id,String name){this.id = id;this.name = name;}@OverridepublicStringtoString(){return"Product{id='"+ id +"',+ name +"'}";}}// 生产者类staticclassProducerimplementsRunnable{privateString producerName;publicProducer(String producerName){this.producerName = producerName;}@Overridepublicvoidrun(){int count =1;while(true){try{Product product =newProduct("P"+ count,"商品"+ count); queue.put(product);System.out.println(producerName +" 生产:"+ product +",队列当前大小:"+ queue.size()); count++;// 模拟生产耗时TimeUnit.SECONDS.sleep(1);}catch(InterruptedException e){ e.printStackTrace();Thread.currentThread().interrupt();}}}}// 消费者类staticclassConsumerimplementsRunnable{privateString consumerName;publicConsumer(String consumerName){this.consumerName = consumerName;}@Overridepublicvoidrun(){while(true){try{Product product = queue.take();System.out.println(consumerName +" 消费:"+ product +",队列当前大小:"+ queue.size());// 模拟消费耗时TimeUnit.SECONDS.sleep(2);}catch(InterruptedException e){ e.printStackTrace();Thread.currentThread().interrupt();}}}}publicstaticvoidmain(String[] args){// 启动2个生产者线程newThread(newProducer("生产者1")).start();newThread(newProducer("生产者2")).start();// 启动3个消费者线程newThread(newConsumer("消费者1")).start();newThread(newConsumer("消费者2")).start();newThread(newConsumer("消费者3")).start();}}1.4.3 运行结果分析
- 当队列满时,生产者线程会阻塞,直到消费者消费商品腾出空间。
- 当队列空时,消费者线程会阻塞,直到生产者生产新的商品。
- 多个生产者和消费者可以并发执行,系统运行稳定,不会出现数据错乱。
✅ 实战结论:基于 BlockingQueue 实现生产者消费者模式,无需手动加锁和控制线程状态。这种方式代码简洁、性能稳定,是并发编程中的优选方案。
1.5 并发容器与协作工具选型建议
| 工具类型 | 具体实现 | 核心优势 | 适用场景 |
|---|---|---|---|
| 并发Map | ConcurrentHashMap | 高并发读写、细粒度锁 | 缓存存储、键值对数据共享 |
| 并发List | CopyOnWriteArrayList | 读操作无锁、性能高 | 读多写少、配置列表 |
| 阻塞队列 | ArrayBlockingQueue/LinkedBlockingQueue | 自动阻塞、解耦生产者消费者 | 任务队列、消息传递 |
| 线程协作 | CountDownLatch | 主线程等待子线程完成 | 任务汇总、系统启动 |
| 线程协作 | CyclicBarrier | 线程组相互等待、可重复使用 | 数据分片处理、批量任务 |
| 限流工具 | Semaphore | 控制并发访问数 | 接口限流、资源池管理 |
1.6 本章小结
💡 本章重点讲解了JAVA中常用的并发容器和线程协作工具。包括 ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue 的核心原理与使用方法,以及 CountDownLatch、CyclicBarrier、Semaphore 三个协作工具的实战场景。
💡 通过生产者消费者模式的完整案例,掌握了如何结合并发容器和协作工具,实现高效、安全的并发编程。
✅ 并发容器和协作工具是JAVA并发编程的核心组件。合理选择和使用这些工具,能够大幅降低并发编程的复杂度,提升系统的稳定性和性能。