C++ 中CAS原子操作详解
在 C++ 中,CAS 操作主要通过 <atomic> 头文件中的 std::atomic 类模板提供的成员函数 compare_exchange_weak和 compare_exchange_strong来实现。
1. CAS 的核心逻辑
CAS 操作包含三个操作数:
内存值 (V):要更新的变量的值。预期原值 (E, Expected):线程认为该变量当前应该有的值(通常是之前读取的快照)。新值 (N, New):线程想要写入的新值。
原子操作流程如下,核心是比较、交换、重复:
比较:检查内存位置 V 的当前值是否等于预期值 E。交换(如果相等):如果相等 (V == E),说明在读取后没有其他线程修改过该变量,则将 V 更新为新值 N。操作成功,返回true。重试/失败(如果不相等):如果不相等 (V != E),说明有其他线程抢先修改了该变量。此时不进行更新,并将内存中的最新值 V 赋值给预期值 E(这样下次重试时就有了最新的参照)。操作失败,返回false。
2. C++ 中的 CAS 函数
在 C++ 中,std::atomic 提供了两个版本的 CAS:
A. compare_exchange_weak(弱 CAS)
即使内存中的值与预期值相等,它也可能返回 false,称为“伪失败”。
原因在于在某些硬件架构(如 ARM、PowerPC)上,CAS 是通过 LL/SC指令实现的,上下文切换等因素可能导致 SC 指令失败。
适用场景:循环算法中。因为在循环中,伪失败只会导致多一次循环,而weak版本在某些平台上通常比strong版本性能更高。
B. compare_exchange_strong (强 CAS)
只有当内存值确实不等于预期值时才会返回 false。
适用场景:非循环的操作,或者当重试的代价非常昂贵时。它在内部可能已经包含了一个小循环来处理伪失败。
3. 代码示例:标准的 CAS 循环
这是最经典的使用模式,用于原子地修改一个变量,向栈中压入一个元素。
#include <atomic> #include <iostream> struct Node { int data; Node* next; }; std::atomic<Node*> head(nullptr); void push(int data) { Node* new_node = new Node{data, nullptr}; // 步骤 A: 获取快照 Node* expected = head.load(); do { // 步骤 B: 构造新状态基于快照 new_node->next = expected; // 步骤 C: 尝试原子交换 // 如果 head == expected,则 head = new_node,返回 true,退出循环。 // 如果 head != expected,则 expected = head (自动更新),返回 false,继续循环。 } while (!head.compare_exchange_weak(expected, new_node)); }需要注意的是 expected是 T&(引用),
当 CAS 失败时:函数不仅返回 false,还会自动将内存中最新的实际值写入 expected变量中。
4. ABA问题
ABA 问题是指在使用 CAS(Compare-And-Swap)操作时,一个变量的值经历了 A -> B -> A 的变化过程,但 CAS 仅仅检查值是否相等,因此误认为该变量从未被修改过,从而导致逻辑错误。
核心本质在于CAS 只能检测“状态”是否一致,无法检测“时序”是否发生过变化。
我们可以举一个例子来形象的形容,
场景:桌子上有一杯水(状态 A)。过程:你看到桌上有杯水,转身去接电话。你室友过来把水喝完了(变成状态 B)。你室友觉得不好意思,又接了一杯水放在原处(变回状态 A)。你接完电话回来,看到水还在,认为没人动过,端起来就喝。后果:虽然看起来还是水,但杯子可能被污染了,或者水的温度变了。你以为没变,实际上已经“偷梁换柱”了。
ABA问题对于一些纯数值修改中通常无害,但在涉及指针和内存管理的动态数据结构中是致命的。
假设我们有一个无锁栈,栈顶指针为 Top。 栈内元素:Top-> Node1 -> Node2
故障出现过程:
线程 1 准备出栈(Pop):
读取Top为Node1。读取Node1->next为Node2。(此时线程 1 被操作系统调度挂起,暂停执行)
线程 2 疯狂操作:
Pop:将Node1弹出,此时Top变为Node2。Pop:将Node2弹出,此时Top变为nullptr(或者其他节点)。Push:线程 2 创建了一个新节点,恰好操作系统的内存分配器将刚刚释放的Node1的内存地址重用给了这个新节点。Push:将这个新节点(地址仍然是Node1)压入栈。现状:Top指向了新的Node1,但Node1->next不再指向Node2(因为Node2已经被释放了)。
线程 1 恢复执行:
执行 CAS 操作:compare_exchange(Top, expected=Node1, new=Node2)。判断:检查Top当前的值,发现地址确实是Node1(尽管内容变了,但地址没变)。交换:CAS 成功!线程 1 将Top修改为Node2。
后果:
Top现在指向了Node2。但是Node2早在步骤 2 中被线程 2 释放回内存池了!任何后续访问Top的操作都会访问野指针,导致程序崩溃或数据错乱。
从底层原因看为什么会发生,首先是操作系统或内存分配器(如 malloc/free)倾向于复用刚刚释放的内存块(热缓存),导致指针地址在短时间内复现,而且CAS 是浅层比较,它只管 64 位(或 32 位)的二进制数据是否相同。
对于以上ABA问题,我们可以采取版本号、std::atomic<std::shared_ptr>、Hazard Pointers来解决。
在此,我们介绍下利用版本号进行控制。
首先定义一个结构体,它将指针和版本号捆绑在一起::
// 假设 Node 是栈中的节点结构 struct Node { int data; Node* next; }; // 复合状态结构体 struct PointerWithVersion { Node* pointer; // 栈顶指针 (Top) unsigned int version; // 版本计数器 }; // 栈顶指针现在是这个复合结构的原子变量 std::atomic<PointerWithVersion> top;然后实现正确的Pop操作
// 模拟无锁栈的 Pop 操作 Node* pop() { // 1. 原子地读取当前的 (指针 A, 版本 v1) 状态 // 这也是线程 1 的“预期值”快照 PointerWithVersion expected = top.load(); // 2. 构造线程 1 想要的新状态: // 指针变为 A->next,版本号递增 PointerWithVersion desired; // 如果栈空,直接返回 if (expected.pointer == nullptr) { return nullptr; } // 线程 1 进入 CAS 循环 do { // 确保在循环内读取最新状态,因为 expected 可能会在 CAS 失败时被更新 Node* current_node = expected.pointer; // 构造新状态:指向下一个节点,版本号递增 desired.pointer = current_node->next; desired.version = expected.version + 1; // 关键的原子操作: // 尝试用 (expected, desired) 来更新 top // 如果 top 仍等于 expected,则 top = desired,返回 true。 // 如果 top 不等于 expected,则 top 被更新为 top 的最新值,返回 false,继续循环。 } while (!top.compare_exchange_weak(expected, desired)); // CAS 成功,current_node 已被弹出 return expected.pointer; }现在我们来分析一下ABA 问题的解决过程,利用表格进行对照
| 状态 | 无版本号 (单指针) | 有版本号 (复合结构) |
| 初始状态 | Top = Node1 | Top = (Node1, 1) |
| 线程 1 快照 | expected = Node1 | expected = (Node1, 1) |
| 线程 2 操作 | 导致 Top 变为 Node1 (地址复用) | 导致 Top 变为 (Node1, 3) (地址复用, 版本递增) |
| 线程 1 恢复执行,尝试 CAS | CAS(Top, Node1, Node2) | CAS(Top, (Node1, 1), (Node2, 2)) |
| CAS 比较 | 比较 Node1 和 Node1。值相等,CAS 成功。 | 比较 (Node1, 3) 和 (Node1, 1)。值不等,CAS 失败。 |
| 结果 | Top 变为野指针 Node2。 | expected 被更新为 (Node1, 3)。线程 1 发现冲突,重新循环,基于最新的状态继续尝试操作。 |
版本号将 CAS 的比较粒度从一个地址提升到了地址与历史状态的组合。
即使指针 A 变回了 A,但版本号 v1已经变成了 v3。线程 1 的预期值 A_{v1} 永远不会等于内存中的最新值 A_{v3},因此 CAS 会失败,线程被迫读取最新的 (Node1, 3) 状态并重试,从而阻止了它将栈顶链接到早已被释放的旧节点上。
5. 自旋锁
5.1 原子“交换”
使用std::atomic<bool> 实现锁,并不是简单的 if 判断,而是利用了 原子的“读-改-写” 能力。
加锁逻辑:
线程尝试将布尔值设置为true(表示“我占用了”)。exchange(true)函数会把变量设为true,并返回旧值。如果返回false:说明之前是解锁状态,我现在抢到了锁。成功!如果返回true:说明之前已经是true(被别人占用了)。失败,继续循环重试(自旋)。
解锁逻辑
将布尔值设回false(store(false))。
5.2 代码实现
#include <atomic> #include <thread> class SpinLock { private: std::atomic<bool> flag = {false}; // false 代表未加锁,true 代表已加锁 public: void lock() { // exchange(true) 会把 flag 设为 true,并返回原来的值。 // 只要返回 true,说明原来就被锁住了,我就在 while 里一直空转(Spin)。 // 只有返回 false,说明原来没锁,我成功锁住了,跳出循环。 // std::memory_order_acquire 保证之后的代码不会被重排到这一行之前 while (flag.exchange(true, std::memory_order_acquire)) { } } void unlock() { // 解锁很简单,直接设为 false。 // std::memory_order_release 保证之前的代码不会被重排到这一行之后 flag.store(false, std::memory_order_release); } };可以利用该自旋锁代替std::mutex,进行性能对比
| 特性 | std::mutex (互斥锁) | atomic 自旋锁 (Spinlock) |
| 等待方式 | 睡眠:如果拿不到锁,操作系统会把线程挂起(放入等待队列) | 忙等:如果拿不到锁,CPU 一直空转循环,不放弃时间片 |
| 上下文切换 | 有:线程挂起/唤醒需要内核介入,开销较大(几微秒级别) | 无:完全在用户态执行,没有系统调用,开销极小(纳秒级别) |
| CPU 占用 | 低(等待时不耗 CPU) | 高(等待时 CPU 满负荷空转) |
| 适用场景 | 临界区代码较长,或有 I/O 操作 | 临界区短,且竞争不激烈 |
在 C++ 中,最标准的自旋锁其实推荐使用 std::atomic_flag,而 std::atomic<bool>上述实现逻辑是为了更好的理解自旋锁的形成。
它的接口非常简单,专门为自旋锁设计:
test_and_set(): 相当于exchange(true)。clear(): 相当于store(false)。