Java中的LinkedBlockingQueue详解
一、概述
LinkedBlockingQueue 是 Java 并发包(java.util.concurrent)中基于链表实现的阻塞队列,支持可选容量限制(默认无界),通过 双锁机制(ReentrantLock)和 条件变量(Condition)实现线程安全。其核心设计目标是高吞吐量和生产者-消费者模型的高效协作。
关键特性
- 线程安全:通过分离的
putLock(入队锁)和takeLock(出队锁)实现并发控制,减少锁竞争。 - 阻塞操作:
put()和take()方法在队列满或空时自动阻塞,支持超时等待。 - FIFO 顺序:严格遵循先进先出原则。
- 可选容量:构造时可指定最大容量(有界),默认无界(
Integer.MAX_VALUE)。 - 弱一致性迭代器:遍历时可能看到部分更新,但不会抛出
ConcurrentModificationException。
二、内部数据结构
1. 节点类 Node<E>
staticclassNode<E>{E item;// 存储元素Node<E> next;// 指向下一个节点Node(E x){ item = x;}}- 链表结构:通过
head(头节点)和last(尾节点)维护队列顺序。 - 哨兵节点:初始化时
head和last均指向一个item=null的哨兵节点,简化边界处理。
2. 核心属性
| 属性 | 作用 |
|---|---|
capacity | 队列最大容量(默认 Integer.MAX_VALUE) |
count | 当前元素数量(AtomicInteger 保证原子性) |
putLock/takeLock | 入队和出队操作的独占锁 |
notEmpty/notFull | 条件变量,分别用于队列空和队列满时的线程阻塞与唤醒 |
三、核心方法与实现原理
1. 入队操作
put(E e)
publicvoidput(E e)throwsInterruptedException{if(e ==null)thrownewNullPointerException();int c =-1;Node<E> node =newNode<>(e);finalReentrantLock putLock =this.putLock;finalAtomicInteger count =this.count; putLock.lockInterruptibly();// 获取入队锁try{while(count.get()== capacity){ notFull.await();// 队列满时阻塞}enqueue(node);// 入队操作 c = count.getAndIncrement();if(c +1< capacity) notFull.signal();// 唤醒其他生产者线程}finally{ putLock.unlock();}if(c ==0)signalNotEmpty();// 若队列之前为空,唤醒消费者线程}- 阻塞逻辑:队列满时调用
notFull.await(),释放锁并等待。 - 唤醒机制:入队成功后,若队列未满,通过
notFull.signal()唤醒其他生产者线程。
offer(E e)
publicbooleanoffer(E e){if(e ==null)thrownewNullPointerException();finalAtomicInteger count =this.count;if(count.get()== capacity)returnfalse;// 非阻塞,直接返回失败int c =-1;Node<E> node =newNode<>(e);finalReentrantLock putLock =this.putLock; putLock.lock();try{if(count.get()< capacity){enqueue(node); c = count.getAndIncrement();if(c +1< capacity) notFull.signal();}}finally{ putLock.unlock();}if(c ==0)signalNotEmpty();return c >=0;}- 非阻塞特性:队列满时立即返回
false,不阻塞线程。
2. 出队操作
take()
publicEtake()throwsInterruptedException{E x;int c =-1;finalAtomicInteger count =this.count;finalReentrantLock takeLock =this.takeLock; takeLock.lockInterruptibly();// 获取出队锁try{while(count.get()==0){ notEmpty.await();// 队列空时阻塞} x =dequeue();// 出队操作 c = count.getAndDecrement();if(c >1) notEmpty.signal();// 唤醒其他消费者线程}finally{ takeLock.unlock();}if(c == capacity)signalNotFull();// 若队列之前满,唤醒生产者线程return x;}- 阻塞逻辑:队列空时调用
notEmpty.await(),释放锁并等待。 - 唤醒机制:出队成功后,若队列非空,通过
notEmpty.signal()唤醒其他消费者线程。
poll()
publicEpoll(){finalAtomicInteger count =this.count;if(count.get()==0)returnnull;// 非阻塞,直接返回 nullE x =null;int c =-1;finalReentrantLock takeLock =this.takeLock; takeLock.lock();try{if(count.get()>0){ x =dequeue(); c = count.getAndDecrement();if(c >1) notEmpty.signal();}}finally{ takeLock.unlock();}if(c == capacity)signalNotFull();return x;}- 非阻塞特性:队列空时立即返回
null,不阻塞线程。
四、线程安全与锁机制
1. 双锁分离
putLock:控制入队操作,允许多个生产者并发插入。takeLock:控制出队操作,允许多个消费者并发移除。- 优势:减少锁竞争,提升吞吐量(相比单锁的
ArrayBlockingQueue)。
2. 条件变量
notEmpty:队列空时,消费者线程在此条件上等待。notFull:队列满时,生产者线程在此条件上等待。- 唤醒策略:插入或移除元素后,通过
signal()或signalAll()唤醒等待线程。
五、适用场景
- 生产者-消费者模型:平衡生产者和消费者的处理速度,避免资源浪费。
- 线程池任务队列:如
ThreadPoolExecutor默认使用LinkedBlockingQueue作为任务缓冲区。 - 数据流水线处理:在多阶段数据处理中传递数据。
- 限流场景:通过设置容量限制防止系统过载。
六、与其他队列的对比
| 特性 | LinkedBlockingQueue | ArrayBlockingQueue | ConcurrentLinkedQueue |
|---|---|---|---|
| 线程安全机制 | 双锁分离 | 单锁(ReentrantLock) | CAS + 自旋 |
| 容量 | 可选有界(默认无界) | 必须有界 | 无界 |
| 性能 | 高并发下吞吐量高 | 低并发下延迟低 | 读多写少场景更优 |
| 适用场景 | 生产者-消费者、任务队列 | 固定容量缓冲区 | 高并发无锁队列 |
七、注意事项
- 内存风险:默认无界队列可能导致内存溢出(OOM),建议生产环境指定容量。
- 中断处理:阻塞方法(如
put()/take())会响应中断,需捕获InterruptedException并处理中断状态。 - 性能调优:高并发场景下,合理设置队列容量和线程池参数(如
corePoolSize、maximumPoolSize)。
八、源码设计细节
- 哨兵节点:初始化时
head和last均指向空节点,简化插入和删除逻辑。 - 弱一致性迭代器:遍历时基于当前快照,允许并发修改但可能遗漏新元素。
- 内存管理:节点通过
new分配,依赖 GC 回收,避免手动内存管理开销。
九、总结
LinkedBlockingQueue 是 Java 并发编程中高性能阻塞队列的典范,通过双锁分离和条件变量机制,在保证线程安全的同时最大化吞吐量。适用于生产者-消费者模型、线程池任务队列等场景,但需注意无界队列的内存风险。理解其底层实现(如双锁、条件变量)有助于在实际工程中合理应用。
Java中的ConcurrentLinkedQueue详解
Vert.x 4 学习笔记