
C++ 内存序深度解析:6 种内存模型与多线程同步
C++ 内存序是并发编程中控制内存可见性和操作顺序的关键机制。详解 std::memory_order_seq_cst、acquire、release、acq_rel、relaxed 及 consume 六种内存序的语义差异与适用场景。通过双重检查锁定、无锁队列等代码示例,阐述如何正确配对使用内存序以避免数据竞争和重排问题。提供性能对比基准测试方法、常见误区分析及最佳实践指南,帮助开发者在保证程序正确性的前提下实现高性能并发。

C++ 内存序是并发编程中控制内存可见性和操作顺序的关键机制。详解 std::memory_order_seq_cst、acquire、release、acq_rel、relaxed 及 consume 六种内存序的语义差异与适用场景。通过双重检查锁定、无锁队列等代码示例,阐述如何正确配对使用内存序以避免数据竞争和重排问题。提供性能对比基准测试方法、常见误区分析及最佳实践指南,帮助开发者在保证程序正确性的前提下实现高性能并发。


在 C++ 多线程编程中,内存序(Memory Order)是一个至关重要但又常常被误解的概念。C++11 引入了原子操作和 6 种内存序,为开发者提供了细粒度的内存同步控制。理解这些内存序不仅有助于编写正确的高并发程序,还能在需要时实现极致的性能优化。
现代处理器和编译器为了优化性能,会对内存操作进行重排序:
// 源代码
int x = 0, y = 0;
bool ready = false;
// 线程 1
void thread1() {
x = 42; // 操作 A
y = 17; // 操作 B
ready = true; // 操作 C
}
// 线程 2
void thread2() {
if (ready) { // 操作 D
assert(x == 42); // 可能失败!
assert(y == 17); // 可能失败!
}
}
由于内存重排,操作 C 可能被重排到操作 A 和 B 之前,导致线程 2 看到 ready == true 但 x 和 y 还没有被赋值。
不同处理器核心有各自的缓存,一个核心的写操作不会立即对其他核心可见:
// 没有正确同步的代码
std::atomic<int> data{0};
bool ready = false; // 非原子变量
// 线程 1
void producer() {
data.store(42, std::memory_order_relaxed);
ready = true; // 问题:非原子操作,可见性无法保证
}
// 线程 2
void consumer() {
while (!ready) { // 可能永远看不到 ready 变为 true
std::this_thread::yield();
}
int value = data.load(std::memory_order_relaxed); // value 可能不是 42!
}
C++11 提供了 std::atomic 模板,支持原子操作:
#include <atomic>
std::atomic<int> counter{0};
// 线程安全的自增
void increment() {
counter.fetch_add(1, std::memory_order_relaxed);
}
C++ 定义了 6 种内存顺序,分为 3 大类:
memory_order_seq_cstmemory_order_acquire、memory_order_release、memory_order_acq_relmemory_order_relaxed、memory_order_consume(已弃用)memory_order_seq_cst(顺序一致性)最强的一致性保证,是原子操作的默认内存序。
std::atomic<bool> x{false}, y{false};
std::atomic<int> z{0};
void write_x() {
x.store(true, std::memory_order_seq_cst); // #1
}
void write_y() {
y.store(true, std::memory_order_seq_cst); // #2
}
void read_x_then_y() {
while (!x.load(std::memory_order_seq_cst)) {} // #3
if (y.load(std::memory_order_seq_cst)) { // #4
++z;
}
}
void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)) {} // #5
if (x.load(std::memory_order_seq_cst)) { // #6
++z;
}
}
顺序一致性保证:如果 read_x_then_y 看到 x=true 然后 y=false,那么 read_y_then_x 不可能看到 y=true 然后 x=false。
特点:
memory_order_acquire(获取语义)建立读操作的同步关系,确保后续的读/写操作不会被重排到该操作之前。
std::atomic<bool> ready{false};
int data = 0;
void producer() {
data = 42; // #1
ready.store(true, std::memory_order_release); // #2
}
void consumer() {
while (!ready.load(std::memory_order_acquire)) {} // #3
std::cout << data << std::endl; // 保证看到 42 // #4
}
获取 - 释放配对保证:
#1(非原子写)happens-before #2(原子释放存储)#3(原子获取加载)synchronizes-with #2#4(非原子读)happens-after #3因此 #1 happens-before #4,消费者保证看到 data = 42。
memory_order_release(释放语义)建立写操作的同步关系,确保之前的读/写操作不会被重排到该操作之后。
// 自旋锁的实现
class SpinLock {
std::atomic<bool> locked{false};
public:
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) { // 自旋等待
}
}
void unlock() {
locked.store(false, std::memory_order_release);
}
};
关键作用:
memory_order_acq_rel(获取 - 释放语义)同时具有获取和释放语义,用于读 - 修改 - 写操作。
std::atomic<int> counter{0};
void increment() {
// fetch_add 是读 - 修改 - 写操作
counter.fetch_add(1, std::memory_order_acq_rel);
}
// 等价于:
void increment_manual() {
int old = counter.load(std::memory_order_acquire);
counter.store(old + 1, std::memory_order_release);
}
使用场景:
memory_order_relaxed(宽松语义)最弱的内存序保证,只保证原子性,不保证同步和顺序。
std::atomic<int> x{0}, y{0};
void thread1() {
x.store(1, std::memory_order_relaxed); // #1
y.store(1, std::memory_order_relaxed); // #2
// #1 和 #2 可能被重排
}
void thread2() {
int r1 = y.load(std::memory_order_relaxed); // #3
int r2 = x.load(std::memory_order_relaxed); // #4
// 可能看到 r1=1, r2=0
std::cout << "r1=" << r1 << ", r2=" << r2 << std::endl;
}
适用场景:
memory_order_consume(消费语义,已弃用)C++17 已弃用,不建议使用。原本用于数据依赖关系的优化。
// 不推荐使用
std::atomic<Data*> data_ptr{nullptr};
int value = 0;
void producer() {
Data* p = new Data{42};
value = 100;
data_ptr.store(p, std::memory_order_release);
}
void consumer() {
Data* p;
while (!(p = data_ptr.load(std::memory_order_consume))) {}
// 保证看到 *p 的正确值,但不保证看到 value=100
std::cout << p->value << std::endl;
}
// 线程安全的单例模式
class Singleton {
private:
static std::atomic<Singleton*> instance;
static std::mutex mutex;
Singleton() = default;
public:
static Singleton* getInstance() {
Singleton* tmp = instance.load(std::memory_order_acquire); // 第一次检查(无锁)
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(mutex);
tmp = instance.load(std::memory_order_relaxed); // 第二次检查(持有锁)
if (tmp == nullptr) {
tmp = new Singleton(); // 关键:使用 release 确保构造函数完成后再发布指针
instance.store(tmp, std::memory_order_release);
}
}
return tmp;
}
// 禁止拷贝
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
};
std::atomic<Singleton*> Singleton::instance{nullptr};
std::mutex Singleton::mutex;
template<typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& value) : data(value), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* dummy = new Node(T());
head.store(dummy, std::memory_order_relaxed);
tail.store(dummy, std::memory_order_relaxed);
}
~LockFreeQueue() {
while (Node* node = head.load(std::memory_order_relaxed)) {
head.store(node->next.load(std::memory_order_relaxed), std::memory_order_relaxed);
delete node;
}
}
void enqueue(const T& value) {
Node* new_node = new Node(value);
Node* old_tail = nullptr;
while (true) {
old_tail = tail.load(std::memory_order_acquire);
Node* next = old_tail->next.load(std::memory_order_acquire); // 检查 tail 是否仍然正确
if (old_tail == tail.load(std::memory_order_relaxed)) {
if (next == nullptr) {
// 尝试链接新节点
if (old_tail->next.compare_exchange_weak(next, new_node, std::memory_order_release, std::memory_order_relaxed)) {
break; // 成功
}
} else {
// 帮助推进 tail
tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_relaxed);
}
}
}
// 推进 tail
tail.compare_exchange_weak(old_tail, new_node, std::memory_order_release, std::memory_order_relaxed);
}
bool dequeue(T& value) {
while (true) {
Node* old_head = head.load(std::memory_order_acquire);
Node* old_tail = tail.load(std::memory_order_acquire);
Node* next = old_head->next.load(std::memory_order_acquire); // 检查一致性
if (old_head == head.load(std::memory_order_relaxed)) {
if (old_head == old_tail) {
if (next == nullptr) {
return false; // 队列为空
}
// 帮助推进 tail
tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_relaxed);
} else {
value = next->data;
if (head.compare_exchange_weak(old_head, next, std::memory_order_release, std::memory_order_relaxed)) {
delete old_head;
return true;
}
}
}
}
}
};
| 场景 | 推荐内存序 | 理由 |
|---|---|---|
| 简单的原子计数器 | relaxed | 只需要原子性,不需要同步其他数据 |
| 自旋锁的获取 | acquire | 建立获取锁后的读屏障 |
| 自旋锁的释放 | release | 建立释放锁前的写屏障 |
| CAS 操作(成功时) | acq_rel | 同时需要获取和释放语义 |
| 需要全局顺序 | seq_cst | 默认选择,最安全 |
| 标志位同步 | release/acquire | 典型的生产者 - 消费者模式 |
| 引用计数 | acq_rel | 确保增减操作的同步 |
// 高性能统计计数器
class PerformanceCounter {
// 每个线程有自己的缓存
struct alignas(64) ThreadLocal {
// 缓存行对齐,避免伪共享
int64_t local_count{0};
char padding[64 - sizeof(int64_t)]; // 填充到缓存行大小
};
std::vector<ThreadLocal> thread_local_counts;
std::atomic<int64_t> global_count{0};
public:
PerformanceCounter(size_t num_threads) : thread_local_counts(num_threads) {}
void increment(size_t thread_id) {
// 线程本地累加,无锁
thread_local_counts[thread_id].local_count++;
// 定期同步到全局计数器
if (thread_local_counts[thread_id].local_count % 1000 == 0) {
int64_t local = thread_local_counts[thread_id].local_count;
thread_local_counts[thread_id].local_count = 0;
// 宽松内存序即可,因为这是独立的统计
global_count.fetch_add(local, std::memory_order_relaxed);
}
}
int64_t get_total() const {
// 需要获取所有线程本地的值
int64_t total = global_count.load(std::memory_order_relaxed);
for (const auto& local : thread_local_counts) {
total += local.local_count;
}
return total;
}
};
// 无锁的发布 - 订阅机制
template<typename T>
class Publisher {
struct SharedData {
T data;
std::atomic<int> version{0};
};
alignas(64) SharedData shared[2]; // 双缓冲
std::atomic<int> current_index{0};
public:
void publish(const T& new_data) {
int idx = current_index.load(std::memory_order_relaxed);
int next_idx = 1 - idx; // 切换到另一个缓冲区
// 准备新数据
shared[next_idx].data = new_data;
// 发布:先增加版本号,再切换缓冲区
shared[next_idx].version.store(shared[idx].version.load(std::memory_order_acquire) + 1, std::memory_order_release);
// 切换当前缓冲区(release 确保前面的写对其他线程可见)
current_index.store(next_idx, std::memory_order_release);
}
bool subscribe(T& out_data, int& last_version) {
int idx = current_index.load(std::memory_order_acquire);
int version = shared[idx].version.load(std::memory_order_acquire);
if (version != last_version) {
// 获取数据(acquire 确保看到发布者的所有写)
out_data = shared[idx].data;
last_version = version;
return true;
}
return false;
}
};
#include <benchmark/benchmark.h>
#include <atomic>
#include <thread>
#include <vector>
// 基准测试:不同内存序的性能对比
static void BM_AtomicIncrement_Relaxed(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_relaxed);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_Relaxed);
static void BM_AtomicIncrement_AcqRel(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_acq_rel);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_AcqRel);
static void BM_AtomicIncrement_SeqCst(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_seq_cst);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_SeqCst);
// 多线程竞争测试
static void BM_AtomicWithContention(benchmark::State& state) {
const int kNumThreads = 4;
std::atomic<int> counter{0};
std::vector<std::thread> threads;
for (auto _ : state) {
counter.store(0, std::memory_order_relaxed);
threads.clear();
for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back([&counter, &state]() {
for (int j = 0; j < state.range(0); ++j) {
counter.fetch_add(1, std::memory_order_relaxed);
}
});
}
for (auto& t : threads) {
t.join();
}
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicWithContention)->Arg(1000)->Arg(10000);
BENCHMARK_MAIN();
// 错误示例:不匹配的内存序
std::atomic<bool> flag{false};
int data = 0;
void thread1() {
data = 42;
flag.store(true, std::memory_order_release); // 释放
}
void thread2() {
// 错误:应该使用 acquire 来匹配 release
if (flag.load(std::memory_order_relaxed)) { // 应该是 acquire!
std::cout << data << std::endl; // 可能看不到 42
}
}
// 不必要的性能开销
class Config {
std::atomic<bool> updated{false};
std::string value;
public:
void update(const std::string& new_value) {
value = new_value;
// 过度使用:其实只需要 release
updated.store(true, std::memory_order_seq_cst);
}
std::string get() {
// 过度使用:其实只需要 acquire
while (!updated.load(std::memory_order_seq_cst)) {}
return value;
}
};
// 危险:非原子数据没有正确同步
struct Data {
int a, b, c;
};
std::atomic<Data*> ptr{nullptr};
void writer() {
Data* p = new Data{1, 2, 3};
// 问题:Data 的成员初始化可能没有 happens-before 关系
ptr.store(p, std::memory_order_release);
}
void reader() {
Data* p = ptr.load(std::memory_order_acquire);
if (p) {
// 可能看到部分初始化的数据!
std::cout << p->a << ", " << p->b << ", " << p->c << std::endl;
}
}
happens-before 关系:
seq_cstrelease 必须与 acquire 或 acq_rel 配对memory_order_seq_cst:在不确定时,使用默认值最安全// 最后的建议:保持简单
class SimpleSafeCounter {
std::atomic<int> count{0}; // 默认使用 seq_cst
public:
void increment() {
count.fetch_add(1); // 默认内存序,安全简单
}
int get() const {
return count.load(); // 默认内存序,安全简单
}
};
// 除非有明确的性能需求,否则保持简单是最好的策略
C++ 的内存序是一个强大但复杂的特性。通过理解 6 种内存序的语义和适用场景,开发者可以在保证正确性的前提下,编写出高性能的并发程序。记住:正确性永远比性能更重要。只有在确实需要优化性能,并且完全理解内存序语义时,才应该使用非默认的内存序。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online