C++ 无锁队列(Lock-Free Queue)详解
无锁队列是不依赖互斥锁(mutex)、自旋锁等同步原语,仅通过原子操作(Atomic Operation)和内存序(Memory Order) 保证多线程并发安全的队列数据结构。其核心目标是避免锁竞争带来的线程阻塞、上下文切换开销,从而在高并发场景下提升吞吐量和响应延迟。
本文介绍 C++ 无锁队列的核心概念、设计难点及实现方案。涵盖无锁与无等待的区别、原子操作与内存序的作用,重点解析 ABA 问题、内存回收及虚假共享等挑战。详细对比了 Michael-Scott 链表队列与环形数组队列(SPSC/MPMC)的实现逻辑,分析了优缺点及适用场景,并推荐了 Boost.Lockfree 等生产级库。旨在帮助开发者在高并发场景下选择合适的数据结构。

无锁队列是不依赖互斥锁(mutex)、自旋锁等同步原语,仅通过原子操作(Atomic Operation)和内存序(Memory Order) 保证多线程并发安全的队列数据结构。其核心目标是避免锁竞争带来的线程阻塞、上下文切换开销,从而在高并发场景下提升吞吐量和响应延迟。
很多人会混淆这两个概念,需明确区分:
C++ 中无锁队列通常指 Lock-Free 级别,而非严格的 Wait-Free。
无锁队列的安全性完全依赖 std::atomic(C++11 引入)和内存序(Memory Order),二者缺一不可:
std::atomic 原子类型exchange)、比较并交换(compare_exchange_weak/compare_exchange_strong)等操作。compare_exchange_weak(T& expected, T desired)(CAS 操作),核心逻辑是:
expected,则更新为 desired,返回 true;expected 设为原子变量当前值,返回 false。std::atomic<Node*> head;(原子指针,用于队列头/尾的并发修改)。CPU 可能重排指令,编译器也可能优化指令顺序,导致多线程下数据可见性错误。内存序用于约束指令重排和数据同步,常用类型:
memory_order_relaxed:最弱,仅保证操作本身原子,不约束重排和可见性(仅用于独立无关的原子操作)。memory_order_acquire:读操作,禁止后续指令重排到该操作之前;保证该操作读取到的数据是其他线程 release 写入的最新值。memory_order_release:写操作,禁止之前的指令重排到该操作之后;保证该操作写入的数据对其他线程 acquire 可见。memory_order_seq_cst:最强(默认),保证所有线程看到的原子操作顺序一致,开销最大。无锁队列的设计核心是用 acquire/release 内存序替代 seq_cst,在保证正确性的前提下优化性能。
多线程并发修改时,原子变量的值从 A 被改为 B,再改回 A,导致依赖该值的线程误以为'值未变化',从而执行错误操作。
示例(队列头节点删除):
head = A,准备通过 CAS 删除 A(将 head 改为 A->next);head == A(条件成立),将 head 改为 A->next,但此时 A 可能已被 T2 回收(悬空指针),或 A->next 已失效,导致崩溃。std::atomic<std::pair<Node*, uint64_t>>),每次修改指针时版本号递增。CAS 时同时检查指针和版本号,即使指针相同,版本号不同也会拒绝更新。无锁队列的节点是动态分配的,但不能直接 delete——可能有其他线程仍在访问该节点(如 T1 读取节点后,T2 删除节点并 delete,T1 后续访问会触发野指针)。
解决方案与 ABA 问题重叠(风险指针、时代回收),核心思想是延迟回收,确保节点不再被任何线程引用后再释放。
队列的 head 和 tail 原子变量若位于同一缓存行(64 字节),多线程并发修改时会导致缓存行频繁失效(缓存颠簸),严重影响性能。
解决方案:
std::hardware_destructive_interference_size(C++17)将 head 和 tail 分到不同缓存行。head 和 tail 之间插入足够的占位字节(如 64 字节),强制编译器分开存储。head/tail 的并发修改冲突。head(出队)和 tail(入队)的原子修改,逻辑更复杂。由 Michael 和 Scott 于 1996 年提出,是最经典的 MPMC 无锁队列实现,基于单向链表,核心是通过 CAS 原子修改 head 和 tail。
template<typename T>
struct Node {
T data;
std::atomic<Node*> next; // 原子指针,指向后继节点
Node(const T& val) : data(val), next(nullptr) {}
};
template<typename T>
class MSQueue {
private:
// 标记指针:Node* + 版本号(解决 ABA 问题)
struct TaggedPtr {
Node<T>* ptr;
uint64_t tag;
TaggedPtr(Node<T>* p = nullptr, uint64_t t = 0) : ptr(p), tag(t) {}
bool operator==(const TaggedPtr& other) const {
return ptr == other.ptr && tag == other.tag;
}
};
std::atomic<TaggedPtr> head; // 队列头(原子标记指针)
std::atomic<TaggedPtr> tail; // 队列尾(原子标记指针)
// 缓存行对齐,避免虚假共享
char padding[std::hardware_destructive_interference_size - sizeof(std::atomic<TaggedPtr>)];
public:
MSQueue() {
Node<T>* dummy = new Node<T>(T{}); // 哨兵节点(简化入队/出队逻辑)
head.store({dummy, 0}, std::memory_order_relaxed);
tail.store({dummy, 0}, std::memory_order_relaxed);
}
~MSQueue(); // 需遍历释放所有节点
};
tail:
tail(acquire 内存序,确保看到最新的 tail->next);tail->ptr->next 为 nullptr,尝试将其 CAS 改为新节点(release 内存序);tail CAS 改为新节点(release 内存序),入队完成;tail 或 tail->next),重试。void enqueue(const T& val) {
Node<T>* new_node = new Node<T>(val);
TaggedPtr new_tail{new_node, 0};
while (true) {
TaggedPtr current_tail = tail.load(std::memory_order_acquire); // 读 tail,acquire
Node<T>* tail_ptr = current_tail.ptr;
std::atomic<Node*>& next = tail_ptr->next;
// 检查是否有其他线程已更新 tail 的 next
Node<T>* null_ptr = nullptr;
if (next.compare_exchange_strong(
null_ptr, new_node,
std::memory_order_release, // 写 next,release
std::memory_order_relaxed)) {
// 成功将新节点接在 tail 后,更新 tail
TaggedPtr expected_tail{tail_ptr, current_tail.tag};
new_tail.tag = current_tail.tag + 1; // 版本号递增
tail.compare_exchange_weak(
expected_tail, new_tail,
std::memory_order_release, // 写 tail,release
std::memory_order_relaxed);
return;
} else {
// 其他线程已修改 next,尝试帮助更新 tail(优化:减少重试次数)
Node<T>* next_ptr = next.load(std::memory_order_relaxed);
TaggedPtr expected_tail{tail_ptr, current_tail.tag};
TaggedPtr candidate_tail{next_ptr, current_tail.tag + 1};
tail.compare_exchange_weak(
expected_tail, candidate_tail,
std::memory_order_release,
std::memory_order_relaxed);
}
}
}
head:
head 和 tail(acquire 内存序);head->ptr == tail->ptr 且 head->ptr->next 为 nullptr),返回失败;head->ptr->next(实际数据节点);head CAS 改为该数据节点(版本号递增,release 内存序);head->ptr(哨兵节点),返回数据;bool dequeue(T& val) {
while (true) {
TaggedPtr current_head = head.load(std::memory_order_acquire); // 读 head,acquire
TaggedPtr current_tail = tail.load(std::memory_order_acquire); // 读 tail,acquire
Node<T>* head_ptr = current_head.ptr;
Node<T>* tail_ptr = current_tail.ptr;
Node<T>* next_ptr = head_ptr->next.load(std::memory_order_relaxed); // 检查队列是否为空,或 head/tail 是否已被其他线程修改
if (current_head == head.load(std::memory_order_relaxed)) {
if (head_ptr == tail_ptr) {
if (next_ptr == nullptr) {
return false; // 队列空
}
// 帮助更新 tail(入队未完成 tail 更新)
TaggedPtr expected_tail{tail_ptr, current_tail.tag};
TaggedPtr candidate_tail{next_ptr, current_tail.tag + 1};
tail.compare_exchange_weak(
expected_tail, candidate_tail,
std::memory_order_release,
std::memory_order_relaxed);
} else {
if (next_ptr == nullptr) {
continue; // 中间状态,重试
}
// 读取数据(需保证 next_ptr 的数据已完全写入)
val = next_ptr->data;
// CAS 更新 head 为 next_ptr,版本号递增
TaggedPtr expected_head{head_ptr, current_head.tag};
TaggedPtr new_head{next_ptr, current_head.tag + 1};
if (head.compare_exchange_strong(
expected_head, new_head,
std::memory_order_release, // 写 head,release
std::memory_order_relaxed)) {
delete head_ptr; // 释放原哨兵节点(简化版,未处理内存回收问题)
return true;
}
}
}
}
}
上述是简化版实现,未完全解决内存回收(如 delete head_ptr 可能导致其他线程访问失效节点)和复杂 ABA 场景,仅用于理解核心逻辑。生产环境需结合风险指针或时代回收完善。
基于固定大小数组,通过原子索引 head(出队索引)和 tail(入队索引)实现,适合数据量固定、高吞吐场景。
head 仅由消费者修改,tail 仅由生产者修改,无需 CAS,仅需内存序保证可见性,实现最简单、性能最优。head 和 tail 需通过 CAS 原子修改,需处理索引环绕(取模)和满/空判断。#include <atomic>
#include <vector>
template<typename T, size_t Size>
class SPSCQueue {
private:
static constexpr size_t CAPACITY = Size;
std::vector<T> buffer; // 原子索引,用 acquire/release 保证可见性
std::atomic<size_t> head; // 消费者读取索引(仅消费者修改)
char padding1[std::hardware_destructive_interference_size - sizeof(std::atomic<size_t>)];
std::atomic<size_t> tail; // 生产者写入索引(仅生产者修改)
char padding2[std::hardware_destructive_interference_size - sizeof(std::atomic<size_t>)];
public:
SPSCQueue() : buffer(CAPACITY), head(0), tail(0) {}
// 生产者入队(仅单线程调用)
bool enqueue(const T& val) {
const size_t current_tail = tail.load(std::memory_order_relaxed);
const size_t next_tail = (current_tail + 1) % CAPACITY;
if (next_tail == head.load(std::memory_order_acquire)) {
return false; // 队列满
}
buffer[current_tail] = val;
tail.store(next_tail, std::memory_order_release); // 写 tail,release
return true;
}
// 消费者出队(仅单线程调用)
bool dequeue(T& val) {
const size_t current_head = head.load(std::memory_order_relaxed);
if (current_head == tail.load(std::memory_order_acquire)) {
return false; // 队列空
}
val = buffer[current_head];
head.store((current_head + 1) % CAPACITY, std::memory_order_release); // 写 head,release
return true;
}
bool empty() const {
return head.load(std::memory_order_acquire) == tail.load(std::memory_order_acquire);
}
bool full() const {
const size_t next_tail = (tail.load(std::memory_order_acquire) + 1) % CAPACITY;
return next_tail == head.load(std::memory_order_acquire);
}
};
head 和 tail 分别由单个线程修改,仅需 acquire/release 保证生产者写入的数据对消费者可见。next_tail == head(满)和 head == tail(空),牺牲一个元素空间(避免满和空的判断冲突)。std::queue + std::mutex)更简单、性能相当。手动实现无锁队列难度大、易出错,生产环境优先使用成熟库:
lockfree::queue(MPMC)、lockfree::stack(MPSC)、lockfree::spsc_queue(SPSC)。示例:
#include <boost/lockfree/queue.hpp>
boost::lockfree::queue<int> q(1024); // 容量 1024 的 MPMC 无锁队列
q.push(42); // 入队
int val;
q.pop(val); // 出队
无锁队列是 C++ 高并发编程的核心数据结构,其本质是用原子操作替代锁,用内存序保证数据可见性和指令顺序。关键难点是 ABA 问题和内存回收,解决方案包括标记指针、风险指针、时代回收等。
实际开发中,优先选择成熟的第三方库(如 Boost.Lockfree、moodycamel),避免重复造轮子;若需自定义实现,建议从 SPSC 环形队列入手(逻辑简单、不易出错),再逐步挑战 MPMC 链表队列。
核心要点:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online