跳到主要内容C++ 内存序深度解析:6 种内存模型与多线程同步 | 极客日志C++算法
C++ 内存序深度解析:6 种内存模型与多线程同步
C++ 内存序是并发编程中控制内存可见性和操作顺序的关键机制。详解 std::memory_order_seq_cst、acquire、release、acq_rel、relaxed 及 consume 六种内存序的语义差异与适用场景。通过双重检查锁定、无锁队列等代码示例,阐述如何正确配对使用内存序以避免数据竞争和重排问题。提供性能对比基准测试方法、常见误区分析及最佳实践指南,帮助开发者在保证程序正确性的前提下实现高性能并发。
黑客27 浏览 
引言
在 C++ 多线程编程中,内存序(Memory Order)是一个至关重要但又常常被误解的概念。C++11 引入了原子操作和 6 种内存序,为开发者提供了细粒度的内存同步控制。理解这些内存序不仅有助于编写正确的高并发程序,还能在需要时实现极致的性能优化。
1. 为什么需要内存序?
1.1 现代处理器的内存重排
现代处理器和编译器为了优化性能,会对内存操作进行重排序:
int x = 0, y = 0;
bool ready = false;
void thread1() {
x = 42;
y = 17;
ready = true;
}
void thread2() {
if (ready) {
assert(x == 42);
assert(y == 17);
}
}
由于内存重排,操作 C 可能被重排到操作 A 和 B 之前,导致线程 2 看到 ready == true 但 x 和 y 还没有被赋值。
1.2 内存可见性问题
不同处理器核心有各自的缓存,一个核心的写操作不会立即对其他核心可见:
std::atomic<int> data{0};
bool ready = false;
{
data.(, std::memory_order_relaxed);
ready = ;
}
{
(!ready) {
std::this_thread::();
}
value = data.(std::memory_order_relaxed);
}
void producer()
store
42
true
void consumer()
while
yield
int
load
2. 内存模型基础概念
2.1 原子操作
C++11 提供了 std::atomic 模板,支持原子操作:
#include <atomic>
std::atomic<int> counter{0};
void increment() {
counter.fetch_add(1, std::memory_order_relaxed);
}
2.2 内存顺序的类型
- 顺序一致性:
memory_order_seq_cst
- 获取 - 释放语义:
memory_order_acquire、memory_order_release、memory_order_acq_rel
- 宽松顺序:
memory_order_relaxed、memory_order_consume(已弃用)
3. 6 种内存序详解
3.1 memory_order_seq_cst(顺序一致性)
std::atomic<bool> x{false}, y{false};
std::atomic<int> z{0};
void write_x() {
x.store(true, std::memory_order_seq_cst);
}
void write_y() {
y.store(true, std::memory_order_seq_cst);
}
void read_x_then_y() {
while (!x.load(std::memory_order_seq_cst)) {}
if (y.load(std::memory_order_seq_cst)) {
++z;
}
}
void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)) {}
if (x.load(std::memory_order_seq_cst)) {
++z;
}
}
顺序一致性保证:如果 read_x_then_y 看到 x=true 然后 y=false,那么 read_y_then_x 不可能看到 y=true 然后 x=false。
- 所有操作按某个全局顺序执行
- 所有线程看到相同的操作顺序
- 性能开销最大
3.2 memory_order_acquire(获取语义)
建立读操作的同步关系,确保后续的读/写操作不会被重排到该操作之前。
std::atomic<bool> ready{false};
int data = 0;
void producer() {
data = 42;
ready.store(true, std::memory_order_release);
}
void consumer() {
while (!ready.load(std::memory_order_acquire)) {}
std::cout << data << std::endl;
}
#1(非原子写)happens-before #2(原子释放存储)
#3(原子获取加载)synchronizes-with #2
#4(非原子读)happens-after #3
因此 #1 happens-before #4,消费者保证看到 data = 42。
3.3 memory_order_release(释放语义)
建立写操作的同步关系,确保之前的读/写操作不会被重排到该操作之后。
class SpinLock {
std::atomic<bool> locked{false};
public:
void lock() {
while (locked.exchange(true, std::memory_order_acquire)) {
}
}
void unlock() {
locked.store(false, std::memory_order_release);
}
};
- 释放锁时使用 release 语义
- 确保临界区内的所有操作在锁释放前完成
- 让其他线程在获取锁时能看到这些操作
3.4 memory_order_acq_rel(获取 - 释放语义)
同时具有获取和释放语义,用于读 - 修改 - 写操作。
std::atomic<int> counter{0};
void increment() {
counter.fetch_add(1, std::memory_order_acq_rel);
}
void increment_manual() {
int old = counter.load(std::memory_order_acquire);
counter.store(old + 1, std::memory_order_release);
}
- 自旋锁的加锁操作(exchange)
- 引用计数的增减
- 任何需要同时建立前后同步关系的操作
3.5 memory_order_relaxed(宽松语义)
最弱的内存序保证,只保证原子性,不保证同步和顺序。
std::atomic<int> x{0}, y{0};
void thread1() {
x.store(1, std::memory_order_relaxed);
y.store(1, std::memory_order_relaxed);
}
void thread2() {
int r1 = y.load(std::memory_order_relaxed);
int r2 = x.load(std::memory_order_relaxed);
std::cout << "r1=" << r1 << ", r2=" << r2 << std::endl;
}
- 统计计数器
- 标志位(不需要与其他数据同步)
- 性能要求极高的场景
3.6 memory_order_consume(消费语义,已弃用)
C++17 已弃用,不建议使用。原本用于数据依赖关系的优化。
std::atomic<Data*> data_ptr{nullptr};
int value = 0;
void producer() {
Data* p = new Data{42};
value = 100;
data_ptr.store(p, std::memory_order_release);
}
void consumer() {
Data* p;
while (!(p = data_ptr.load(std::memory_order_consume))) {}
std::cout << p->value << std::endl;
}
4. 代码示例与分析
4.1 双重检查锁定模式
class Singleton {
private:
static std::atomic<Singleton*> instance;
static std::mutex mutex;
Singleton() = default;
public:
static Singleton* getInstance() {
Singleton* tmp = instance.load(std::memory_order_acquire);
if (tmp == nullptr) {
std::lock_guard<std::mutex> lock(mutex);
tmp = instance.load(std::memory_order_relaxed);
if (tmp == nullptr) {
tmp = new Singleton();
instance.store(tmp, std::memory_order_release);
}
}
return tmp;
}
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
};
std::atomic<Singleton*> Singleton::instance{nullptr};
std::mutex Singleton::mutex;
4.2 无锁队列实现
template<typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& value) : data(value), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* dummy = new Node(T());
head.store(dummy, std::memory_order_relaxed);
tail.store(dummy, std::memory_order_relaxed);
}
~LockFreeQueue() {
while (Node* node = head.load(std::memory_order_relaxed)) {
head.store(node->next.load(std::memory_order_relaxed), std::memory_order_relaxed);
delete node;
}
}
void enqueue(const T& value) {
Node* new_node = new Node(value);
Node* old_tail = nullptr;
while (true) {
old_tail = tail.load(std::memory_order_acquire);
Node* next = old_tail->next.load(std::memory_order_acquire);
if (old_tail == tail.load(std::memory_order_relaxed)) {
if (next == nullptr) {
if (old_tail->next.compare_exchange_weak(next, new_node, std::memory_order_release, std::memory_order_relaxed)) {
break;
}
} else {
tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_relaxed);
}
}
}
tail.compare_exchange_weak(old_tail, new_node, std::memory_order_release, std::memory_order_relaxed);
}
bool dequeue(T& value) {
while (true) {
Node* old_head = head.load(std::memory_order_acquire);
Node* old_tail = tail.load(std::memory_order_acquire);
Node* next = old_head->next.load(std::memory_order_acquire);
if (old_head == head.load(std::memory_order_relaxed)) {
if (old_head == old_tail) {
if (next == nullptr) {
return false;
}
tail.compare_exchange_weak(old_tail, next, std::memory_order_release, std::memory_order_relaxed);
} else {
value = next->data;
if (head.compare_exchange_weak(old_head, next, std::memory_order_release, std::memory_order_relaxed)) {
delete old_head;
return true;
}
}
}
}
}
};
5. 内存序的选择策略
5.1 决策流程图
- 是否读操作写操作读 - 修改 - 写
- 是 / 简单优先否 / 性能优先
- 开始选择内存序需要同步非原子数据?
- 读操作还是写操作?
- 使用 memory_order_relaxed
- 使用 memory_order_acquire
- 使用 memory_order_release
- 使用 memory_order_acq_rel
- 需要全局顺序一致性?
- 使用 memory_order_seq_cst
- 保持原选择验证正确性测试和基准测试
- 结束
5.2 选择指南表格
| 场景 | 推荐内存序 | 理由 |
|---|
| 简单的原子计数器 | relaxed | 只需要原子性,不需要同步其他数据 |
| 自旋锁的获取 | acquire | 建立获取锁后的读屏障 |
| 自旋锁的释放 | release | 建立释放锁前的写屏障 |
| CAS 操作(成功时) | acq_rel | 同时需要获取和释放语义 |
| 需要全局顺序 | seq_cst | 默认选择,最安全 |
| 标志位同步 | release/acquire | 典型的生产者 - 消费者模式 |
| 引用计数 | acq_rel | 确保增减操作的同步 |
6. 实际应用场景
6.1 性能计数器
class PerformanceCounter {
struct alignas(64) ThreadLocal {
int64_t local_count{0};
char padding[64 - sizeof(int64_t)];
};
std::vector<ThreadLocal> thread_local_counts;
std::atomic<int64_t> global_count{0};
public:
PerformanceCounter(size_t num_threads) : thread_local_counts(num_threads) {}
void increment(size_t thread_id) {
thread_local_counts[thread_id].local_count++;
if (thread_local_counts[thread_id].local_count % 1000 == 0) {
int64_t local = thread_local_counts[thread_id].local_count;
thread_local_counts[thread_id].local_count = 0;
global_count.fetch_add(local, std::memory_order_relaxed);
}
}
int64_t get_total() const {
int64_t total = global_count.load(std::memory_order_relaxed);
for (const auto& local : thread_local_counts) {
total += local.local_count;
}
return total;
}
};
6.2 发布 - 订阅模式
template<typename T>
class Publisher {
struct SharedData {
T data;
std::atomic<int> version{0};
};
alignas(64) SharedData shared[2];
std::atomic<int> current_index{0};
public:
void publish(const T& new_data) {
int idx = current_index.load(std::memory_order_relaxed);
int next_idx = 1 - idx;
shared[next_idx].data = new_data;
shared[next_idx].version.store(shared[idx].version.load(std::memory_order_acquire) + 1, std::memory_order_release);
current_index.store(next_idx, std::memory_order_release);
}
bool subscribe(T& out_data, int& last_version) {
int idx = current_index.load(std::memory_order_acquire);
int version = shared[idx].version.load(std::memory_order_acquire);
if (version != last_version) {
out_data = shared[idx].data;
last_version = version;
return true;
}
return false;
}
};
7. 性能对比与基准测试
#include <benchmark/benchmark.h>
#include <atomic>
#include <thread>
#include <vector>
static void BM_AtomicIncrement_Relaxed(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_relaxed);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_Relaxed);
static void BM_AtomicIncrement_AcqRel(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_acq_rel);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_AcqRel);
static void BM_AtomicIncrement_SeqCst(benchmark::State& state) {
std::atomic<int> counter{0};
for (auto _ : state) {
counter.fetch_add(1, std::memory_order_seq_cst);
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicIncrement_SeqCst);
static void BM_AtomicWithContention(benchmark::State& state) {
const int kNumThreads = 4;
std::atomic<int> counter{0};
std::vector<std::thread> threads;
for (auto _ : state) {
counter.store(0, std::memory_order_relaxed);
threads.clear();
for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back([&counter, &state]() {
for (int j = 0; j < state.range(0); ++j) {
counter.fetch_add(1, std::memory_order_relaxed);
}
});
}
for (auto& t : threads) {
t.join();
}
benchmark::DoNotOptimize(counter);
}
}
BENCHMARK(BM_AtomicWithContention)->Arg(1000)->Arg(10000);
BENCHMARK_MAIN();
8. 常见误区与陷阱
8.1 错误的内存序组合
std::atomic<bool> flag{false};
int data = 0;
void thread1() {
data = 42;
flag.store(true, std::memory_order_release);
}
void thread2() {
if (flag.load(std::memory_order_relaxed)) {
std::cout << data << std::endl;
}
}
8.2 过度使用顺序一致性
class Config {
std::atomic<bool> updated{false};
std::string value;
public:
void update(const std::string& new_value) {
value = new_value;
updated.store(true, std::memory_order_seq_cst);
}
std::string get() {
while (!updated.load(std::memory_order_seq_cst)) {}
return value;
}
};
8.3 忘记同步非原子数据
struct Data {
int a, b, c;
};
std::atomic<Data*> ptr{nullptr};
void writer() {
Data* p = new Data{1, 2, 3};
ptr.store(p, std::memory_order_release);
}
void reader() {
Data* p = ptr.load(std::memory_order_acquire);
if (p) {
std::cout << p->a << ", " << p->b << ", " << p->c << std::endl;
}
}
9. 流程图解析
9.1 内存序的 happens-before 关系
- synchronizes-with
- 线程 A: store flag=true release
- 线程 A: 非原子写 data=42
- 线程 B: 非原子读 data
- 线程 B: load flag=true acquire
- synchronizes-with 关系
9.2 内存屏障效果图
- 线程 B(消费者)
- 线程 A(生产者)
- 同步点 Acquire 屏障
- 原子加载 flag=true
- 普通读操作 #1
- 普通读操作 #2
- 普通写操作 #2
- 普通写操作 #1
- Release 屏障
- 原子存储 flag=true
10. 总结与最佳实践
10.1 关键要点总结
- 理解内存重排:现代处理器和编译器会重排内存操作以优化性能
- 选择合适的内存序:不是所有场景都需要最强的
seq_cst
- 配对使用:
release 必须与 acquire 或 acq_rel 配对
- 同步非原子数据:原子操作只保护自身,非原子数据需要正确的内存序来同步
- 性能权衡:更强的内存序带来更大的性能开销
10.2 最佳实践指南
- 默认使用
memory_order_seq_cst:在不确定时,使用默认值最安全
- 逐步优化:先写正确的代码,再根据需要优化内存序
- 使用现有模式:遵循已知的正确模式(如双重检查锁定)
- 充分测试:内存序错误可能导致偶发性 bug,需要充分测试
- 文档化:在复杂的内存序使用处添加注释
10.3 推荐工具
- ThreadSanitizer:检测数据竞争
- 模型检查工具:如 CDSChecker、Nidhugg
- 性能分析器:分析不同内存序的性能影响
class SimpleSafeCounter {
std::atomic<int> count{0};
public:
void increment() {
count.fetch_add(1);
}
int get() const {
return count.load();
}
};
结语
C++ 的内存序是一个强大但复杂的特性。通过理解 6 种内存序的语义和适用场景,开发者可以在保证正确性的前提下,编写出高性能的并发程序。记住:正确性永远比性能更重要。只有在确实需要优化性能,并且完全理解内存序语义时,才应该使用非默认的内存序。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online