Java中的ConcurrentLinkedQueue详解
一、概述
ConcurrentLinkedQueue 是 Java 并发包(java.util.concurrent)中提供的无界非阻塞线程安全队列,基于单向链表实现,采用 CAS(Compare-and-Swap) 操作和 无锁算法 保证并发安全。其核心设计目标是高吞吐量和低延迟,适用于高并发场景下的生产者-消费者模型。
关键特性
- 无界性:理论上容量无限,但受内存限制。
- 非阻塞:操作永不阻塞线程,失败立即返回(如
poll()在队列为空时返回null)。 - 无锁设计:通过 CAS 和自旋机制实现线程安全,避免传统锁的开销。
- FIFO 顺序:严格遵循先进先出原则。
- 弱一致性迭代器:遍历时可能看到部分更新,但不会抛出
ConcurrentModificationException。
二、内部数据结构
1. 节点类 Node<E>
privatestaticclassNode<E>{volatileE item;// 存储元素volatileNode<E> next;// 指向下一个节点// CAS 操作方法booleancasItem(E cmp,E val){...}booleancasNext(Node<E> cmp,Node<E> val){...}}- volatile 修饰:保证多线程可见性。
- CAS 操作:通过
Unsafe类实现原子性更新。
2. 队列指针
- head:指向队列头部(可能滞后于实际头节点)。
- tail:指向队列尾部(可能滞后于实际尾节点)。
- 哨兵节点:初始化时
head和tail均指向一个item=null的哨兵节点。
三、核心方法与实现原理
1. 入队操作(offer/add)
publicbooleanoffer(E e){checkNotNull(e);Node<E> newNode =newNode<>(e);for(Node<E> t = tail, p = t;;){Node<E> q = p.next;if(q ==null){// p 是尾节点if(p.casNext(null, newNode)){if(p != t)casTail(t, newNode);// 更新 tailreturntrue;}}elseif(p == q){// 自引用节点,重置 tail p =(t !=(t = tail))? t : head;}else{ p =(p != t && t !=(t = tail))? t : q;}}}- CAS 竞争:多个线程可能同时尝试插入,仅一个成功。
- tail 滞后更新:仅在
p != t时更新tail,减少 CAS 操作频率。
2. 出队操作(poll)
publicEpoll(){ restartFromHead:for(;;){for(Node<E> h = head, p = h, q;;){E item = p.item;if(item !=null&& p.casItem(item,null)){if(p != h)updateHead(h, p);// 更新 headreturn item;}elseif((q = p.next)==null){updateHead(h, p);returnnull;}elseif(p == q)continue restartFromHead;else p = q;}}}- CAS 移除:将头节点的
item设为null,延迟物理删除。 - head 更新:若
p != h,则更新head指针。
3. 其他方法
- peek():获取头元素但不移除,逻辑与
poll()类似,不修改item。 - size():遍历链表统计元素数,非线程安全(高并发下结果可能不准确)。
- remove(Object o):遍历链表移除首个匹配元素,返回是否成功。
四、无锁并发控制机制
1. CAS 操作
- 原子性更新:通过
Unsafe.compareAndSwapObject实现对item和next的原子修改。 - 自旋重试:CAS 失败时循环重试,而非阻塞线程。
2. 松弛不变量(Relaxed Invariants)
- head 滞后:可能指向已删除节点,仅在必要时更新(如遍历时遇到自引用节点)。
- tail 滞后:减少更新频率,提升吞吐量。
3. 自引用节点
- 标记删除:出队后,原头节点的
next指向自身,防止其他线程误用。 - 垃圾回收:物理删除由 GC 处理,避免频繁内存操作。
五、适用场景
- 高并发生产者-消费者模型:如日志处理、实时事件分发。
- 低延迟系统:如高频交易、游戏服务器。
- 无界缓冲需求:需动态扩展队列长度的场景。
- 弱一致性要求:允许短暂的数据不一致(如遍历时)。
与 LinkedBlockingQueue 对比
| 特性 | ConcurrentLinkedQueue | LinkedBlockingQueue |
|---|---|---|
| 线程安全机制 | CAS 无锁 | 锁(ReentrantLock) |
| 容量限制 | 无界 | 可选有界/无界 |
| 阻塞操作 | 无 | 支持 put()/take() |
| 吞吐量 | 高 | 中等 |
| 内存占用 | 节点结构更轻量 | 可能更高 |
六、最佳实践
- 避免频繁调用
size():高并发下性能差,建议通过外部计数器统计。 - 合理预估容量:虽无界,但内存耗尽可能引发 OOM。
- 结合其他同步机制:如需精确控制,可与
Semaphore或CountDownLatch联用。 - 弱一致性遍历:接受遍历时可能遗漏新元素,适用于非强一致性场景。
七、源码设计细节
- 哨兵节点:初始化时
head和tail指向同一个哨兵节点,简化边界条件处理。 - 自引用节点:出队后原头节点
next=self,标记为待回收。 - CAS 优化:节点构造时使用
Unsafe.putObject替代volatile写操作,减少内存屏障开销。
八、总结
ConcurrentLinkedQueue 是 Java 并发编程中高性能无锁队列的典范,通过 CAS 和松弛不变量设计,在保证线程安全的同时最大化吞吐量。适用于对延迟敏感、无需严格容量控制的场景,但需注意其无界特性可能带来的内存风险。理解其底层机制(如 CAS、自旋、自引用节点)有助于在实际工程中合理应用。
LinkedList详解-Deque接口链表实现方案
Vert.x 4 学习笔记-EventLoop的回调队列原理与机制详解