跳到主要内容 C++ 多线程开发完整指南 | 极客日志
C++
C++ 多线程开发完整指南 C++ 多线程开发的基础与进阶知识。涵盖线程创建与管理、数据共享与同步机制(互斥锁、条件变量)、原子操作及内存顺序。深入讲解了线程池实现、读写锁、C++20 协程与信号量等高级模式。同时提供了避免死锁、伪共享等性能优化技巧,以及调试测试方法。旨在帮助开发者掌握现代 C++ 并发编程的核心技能,编写高效安全的并行程序。
人间失格 发布于 2026/3/27 更新于 2026/4/16 3 浏览一、为什么需要多线程
在现代计算机体系结构中,多核处理器已成为标准配置。多线程编程允许我们充分利用这些计算资源,通过并行执行任务来提升程序性能。C++11 之前,多线程编程依赖于平台特定的 API(如 POSIX pthreads、Windows 线程 API),C++11 标准引入了 <thread> 等头文件,为多线程编程提供了统一、可移植的解决方案。
二、C++ 多线程基础
2.1 第一个多线程程序
#include <iostream>
#include
{
std::cout << << id << << std::endl;
std::this_thread:: (std::chrono:: ( ));
std::cout << << id << << std::endl;
}
{
std::cout << << std::endl;
;
;
;
t ();
t ();
t ();
std::cout << << std::endl;
;
}
<thread>
#include <chrono>
void threadFunction (int id)
"线程 "
" 开始执行"
sleep_for
seconds
1
"线程 "
" 结束执行"
int main ()
"主线程开始,创建 3 个子线程"
std::thread t1 (threadFunction, 1 )
std::thread t2 (threadFunction, 2 )
std::thread t3 (threadFunction, 3 )
1.
join
2.
join
3.
join
"所有线程执行完毕"
return
0
2.2 线程管理的基本操作 #include <thread>
#include <iostream>
void worker () {
std::cout << "工作线程 ID: " << std::this_thread::get_id () << std::endl;
}
int main () {
unsigned int n = std::thread::hardware_concurrency ();
std::cout << "硬件支持的最大并发线程数:" << n << std::endl;
std::thread t (worker) ;
std::cout << "主线程 ID: " << std::this_thread::get_id () << std::endl;
std::cout << "子线程 ID: " << t.get_id () << std::endl;
if (t.joinable ()) {
t.join ();
}
return 0 ;
}
三、数据共享与同步
3.1 竞态条件与数据竞争 #include <iostream>
#include <thread>
#include <vector>
int counter = 0 ;
void increment () {
for (int i = 0 ; i < 100000 ; ++i) {
counter++;
}
}
int main () {
std::thread t1 (increment) ;
std::thread t2 (increment) ;
t1. join ();
t2. join ();
std::cout << "计数器值:" << counter << std::endl;
return 0 ;
}
3.2 互斥锁(Mutex) #include <iostream>
#include <thread>
#include <mutex>
#include <vector>
int counter = 0 ;
std::mutex mtx;
void safeIncrement () {
for (int i = 0 ; i < 100000 ; ++i) {
std::lock_guard<std::mutex> lock (mtx) ;
counter++;
}
}
void tryLockExample () {
std::timed_mutex timed_mtx;
for (int i = 0 ; i < 5 ; ++i) {
if (timed_mtx.try_lock ()) {
std::cout << "线程 " << std::this_thread::get_id () << " 获取到锁" << std::endl;
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
timed_mtx.unlock ();
break ;
} else {
std::cout << "线程 " << std::this_thread::get_id () << " 未能获取锁,等待重试" << std::endl;
std::this_thread::sleep_for (std::chrono::milliseconds (50 ));
}
}
}
int main () {
std::thread t1 (safeIncrement) ;
std::thread t2 (safeIncrement) ;
t1. join ();
t2. join ();
std::cout << "安全的计数器值:" << counter << std::endl;
std::thread t3 (tryLockExample) ;
std::thread t4 (tryLockExample) ;
t3. join ();
t4. join ();
return 0 ;
}
3.3 递归锁与锁保护 #include <iostream>
#include <thread>
#include <mutex>
std::recursive_mutex rec_mtx;
void recursiveFunction (int depth) {
if (depth <= 0 ) return ;
rec_mtx.lock ();
std::cout << "深度:" << depth << ", 线程 ID: " << std::this_thread::get_id () << std::endl;
recursiveFunction (depth - 1 );
rec_mtx.unlock ();
}
void flexibleLockExample () {
std::mutex mtx;
std::unique_lock<std::mutex> lock (mtx, std::defer_lock) ;
std::cout << "准备获取锁..." << std::endl;
lock.lock ();
std::cout << "获取到锁,执行临界区操作" << std::endl;
lock.unlock ();
lock.lock ();
std::cout << "重新锁定" << std::endl;
}
int main () {
std::thread t1 (recursiveFunction, 3 ) ;
std::thread t2 (flexibleLockExample) ;
t1. join ();
t2. join ();
return 0 ;
}
3.4 条件变量 #include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int > dataQueue;
bool finished = false ;
void producer (int items) {
for (int i = 0 ; i < items; ++i) {
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::lock_guard<std::mutex> lock (mtx) ;
dataQueue.push (i);
std::cout << "生产:" << i << std::endl;
cv.notify_one ();
}
{
std::lock_guard<std::mutex> lock (mtx) ;
finished = true ;
}
cv.notify_all ();
}
void consumer (int id) {
while (true ) {
std::unique_lock<std::mutex> lock (mtx) ;
cv.wait (lock, [] { return !dataQueue.empty () || finished; });
if (finished && dataQueue.empty ()) {
break ;
}
if (!dataQueue.empty ()) {
int value = dataQueue.front ();
dataQueue.pop ();
lock.unlock ();
std::cout << "消费者 " << id << " 消费:" << value << std::endl;
}
}
std::cout << "消费者 " << id << " 结束" << std::endl;
}
int main () {
std::thread prod (producer, 10 ) ;
std::thread cons1 (consumer, 1 ) ;
std::thread cons2 (consumer, 2 ) ;
prod.join ();
cons1. join ();
cons2. join ();
return 0 ;
}
四、原子操作
4.1 原子类型 #include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int > atomicCounter (0 ) ;
std::atomic<bool > ready (false ) ;
void atomicIncrement () {
while (!ready.load (std::memory_order_acquire)) {
std::this_thread::yield ();
}
for (int i = 0 ; i < 100000 ; ++i) {
atomicCounter.fetch_add (1 , std::memory_order_relaxed);
}
}
void testAtomicOperations () {
std::atomic<int > value (10 ) ;
int expected = 10 ;
bool success = value.compare_exchange_strong (expected, 20 );
std::cout << "CAS 操作:" << (success ? "成功" : "失败" ) << ", 当前值:" << value.load () << std::endl;
std::atomic_flag flag = ATOMIC_FLAG_INIT;
bool was_set = flag.test_and_set ();
std::cout << "第一次 test_and_set: " << was_set << std::endl;
flag.clear ();
was_set = flag.test_and_set ();
std::cout << "清除后 test_and_set: " << was_set << std::endl;
}
int main () {
const int num_threads = 4 ;
std::vector<std::thread> threads;
for (int i = 0 ; i < num_threads; ++i) {
threads.emplace_back (atomicIncrement);
}
ready.store (true , std::memory_order_release);
for (auto & t : threads) {
t.join ();
}
std::cout << "原子计数器最终值:" << atomicCounter.load () << std::endl;
testAtomicOperations ();
return 0 ;
}
4.2 内存顺序 #include <atomic>
#include <thread>
#include <iostream>
std::atomic<int > x (0 ) , y (0 ) ;
std::atomic<int > z (0 ) ;
void write_x_then_y () {
x.store (1 , std::memory_order_relaxed);
y.store (1 , std::memory_order_release);
}
void read_y_then_x () {
while (!y.load (std::memory_order_acquire));
if (x.load (std::memory_order_relaxed)) {
z.fetch_add (1 );
}
}
void memoryOrderDemo () {
std::thread t1 (write_x_then_y) ;
std::thread t2 (read_y_then_x) ;
t1. join ();
t2. join ();
std::cout << "z = " << z.load () << std::endl;
}
int main () {
for (int i = 0 ; i < 10 ; ++i) {
x = 0 ; y = 0 ; z = 0 ;
memoryOrderDemo ();
}
return 0 ;
}
五、高级多线程模式
5.1 线程池实现 #include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
class ThreadPool {
public :
ThreadPool (size_t numThreads) : stop (false ) {
for (size_t i = 0 ; i < numThreads; ++i) {
workers.emplace_back ([this ] {
while (true ) {
std::function<void ()> task;
{
std::unique_lock<std::mutex> lock (queueMutex);
condition.wait (lock, [this ] { return stop || !tasks.empty (); });
if (stop && tasks.empty ()) {
return ;
}
task = std::move (tasks.front ());
tasks.pop ();
}
task ();
}
});
}
}
template <class F, class ... Args>
auto enqueue (F&& f, Args&&... args) -> std::future<typename std::result_of<F (Args...) >::type> {
using return_type = typename std::result_of<F (Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type ()>>(
std::bind (std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future ();
{
std::unique_lock<std::mutex> lock (queueMutex) ;
if (stop) {
throw std::runtime_error ("线程池已停止" );
}
tasks.emplace ([task]() { (*task)(); });
}
condition.notify_one ();
return res;
}
~ThreadPool () {
{
std::unique_lock<std::mutex> lock (queueMutex) ;
stop = true ;
}
condition.notify_all ();
for (std::thread& worker : workers) {
worker.join ();
}
}
private :
std::vector<std::thread> workers;
std::queue<std::function<void ()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
int main () {
ThreadPool pool (4 ) ;
std::vector<std::future<int >> results;
for (int i = 0 ; i < 8 ; ++i) {
results.emplace_back (pool.enqueue ([i] {
std::cout << "任务 " << i << " 在线程 " << std::this_thread::get_id () << " 执行" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds (1 ));
return i * i;
}));
}
for (auto & result : results) {
std::cout << "结果:" << result.get () << std::endl;
}
return 0 ;
}
5.2 读写锁(C++14 及以上) #include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <chrono>
class ThreadSafeData {
private :
mutable std::shared_mutex mutex_;
int data_;
public :
ThreadSafeData (int init = 0 ) : data_ (init) {}
int read () const {
std::shared_lock<std::shared_mutex> lock (mutex_) ;
std::cout << "读取:" << data_ << " 线程 ID: " << std::this_thread::get_id () << std::endl;
return data_;
}
void write (int value) {
std::unique_lock<std::shared_mutex> lock (mutex_) ;
std::cout << "写入:" << value << " 线程 ID: " << std::this_thread::get_id () << std::endl;
data_ = value;
}
void increment () {
std::unique_lock<std::shared_mutex> lock (mutex_) ;
++data_;
std::cout << "增量到:" << data_ << " 线程 ID: " << std::this_thread::get_id () << std::endl;
}
};
int main () {
ThreadSafeData data (0 ) ;
std::vector<std::thread> threads;
for (int i = 0 ; i < 5 ; ++i) {
threads.emplace_back ([&data, i] {
for (int j = 0 ; j < 3 ; ++j) {
data.read ();
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
}
});
}
for (int i = 0 ; i < 2 ; ++i) {
threads.emplace_back ([&data, i] {
for (int j = 0 ; j < 2 ; ++j) {
data.write (i * 10 + j);
std::this_thread::sleep_for (std::chrono::milliseconds (200 ));
}
});
}
threads.emplace_back ([&data] {
for (int i = 0 ; i < 4 ; ++i) {
data.increment ();
std::this_thread::sleep_for (std::chrono::milliseconds (150 ));
}
});
for (auto & t : threads) {
t.join ();
}
std::cout << "最终值:" << data.read () << std::endl;
return 0 ;
}
六、C++20 新特性:协程和信号量
6.1 信号量(C++20) #include <iostream>
#include <thread>
#include <semaphore>
#include <vector>
std::counting_semaphore<10> emptySlots (10 ) ;
std::counting_semaphore<10> fullSlots (0 ) ;
std::mutex bufferMutex;
std::queue<int > buffer;
bool done = false ;
void producer (int id) {
for (int i = 0 ; i < 5 ; ++i) {
emptySlots.acquire ();
{
std::lock_guard<std::mutex> lock (bufferMutex) ;
buffer.push (i);
std::cout << "生产者 " << id << " 生产:" << i << std::endl;
}
fullSlots.release ();
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
}
}
void consumer (int id) {
while (!done || !buffer.empty ()) {
fullSlots.acquire ();
if (done && buffer.empty ()) {
fullSlots.release ();
break ;
}
int value;
{
std::lock_guard<std::mutex> lock (bufferMutex) ;
if (!buffer.empty ()) {
value = buffer.front ();
buffer.pop ();
std::cout << "消费者 " << id << " 消费:" << value << std::endl;
}
}
emptySlots.release ();
}
}
int main () {
const int numProducers = 3 ;
const int numConsumers = 2 ;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0 ; i < numProducers; ++i) {
producers.emplace_back (producer, i);
}
for (int i = 0 ; i < numConsumers; ++i) {
consumers.emplace_back (consumer, i);
}
for (auto & p : producers) {
p.join ();
}
done = true ;
for (int i = 0 ; i < numConsumers; ++i) {
fullSlots.release ();
}
for (auto & c : consumers) {
c.join ();
}
std::cout << "生产消费完成" << std::endl;
return 0 ;
}
七、最佳实践与性能考虑
7.1 避免死锁的准则 #include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
void deadlockThread1 () {
std::lock_guard<std::mutex> lock1 (mtx1) ;
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::lock_guard<std::mutex> lock2 (mtx2) ;
std::cout << "线程 1 完成" << std::endl;
}
void deadlockThread2 () {
std::lock_guard<std::mutex> lock2 (mtx2) ;
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
std::lock_guard<std::mutex> lock1 (mtx1) ;
std::cout << "线程 2 完成" << std::endl;
}
void safeThread1 () {
std::lock (mtx1, mtx2);
std::lock_guard<std::mutex> lock1 (mtx1, std::adopt_lock) ;
std::lock_guard<std::mutex> lock2 (mtx2, std::adopt_lock) ;
std::cout << "安全线程 1 完成" << std::endl;
}
void safeThread2 () {
std::lock (mtx1, mtx2);
std::lock_guard<std::mutex> lock1 (mtx1, std::adopt_lock) ;
std::lock_guard<std::mutex> lock2 (mtx2, std::adopt_lock) ;
std::cout << "安全线程 2 完成" << std::endl;
}
void scopedLockExample () {
std::scoped_lock lock (mtx1, mtx2) ;
std::cout << "使用 scoped_lock 安全加锁" << std::endl;
}
int main () {
std::thread t3 (safeThread1) ;
std::thread t4 (safeThread2) ;
t3. join ();
t4. join ();
std::thread t5 (scopedLockExample) ;
t5. join ();
return 0 ;
}
7.2 性能优化技巧 #include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <chrono>
struct BadAlignment {
int a;
int b;
};
struct alignas (64 ) GoodAlignment {
int a;
};
struct alignas (64 ) GoodAlignment2 {
int b;
};
void falseSharingTest () {
const int iterations = 100000000 ;
BadAlignment bad;
auto start = std::chrono::high_resolution_clock::now ();
std::thread t1 ([&bad, iterations] {
for (int i = 0 ; i < iterations; ++i) {
bad.a++;
}
}) ;
std::thread t2 ([&bad, iterations] {
for (int i = 0 ; i < iterations; ++i) {
bad.b++;
}
}) ;
t1. join ();
t2. join ();
auto end = std::chrono::high_resolution_clock::now ();
auto duration = std::chrono::duration_cast <std::chrono::milliseconds>(end - start);
std::cout << "伪共享耗时:" << duration.count () << "ms" << std::endl;
GoodAlignment good1;
GoodAlignment2 good2;
start = std::chrono::high_resolution_clock::now ();
std::thread t3 ([&good1, iterations] {
for (int i = 0 ; i < iterations; ++i) {
good1. a++;
}
}) ;
std::thread t4 ([&good2, iterations] {
for (int i = 0 ; i < iterations; ++i) {
good2. b++;
}
}) ;
t3. join ();
t4. join ();
end = std::chrono::high_resolution_clock::now ();
duration = std::chrono::duration_cast <std::chrono::milliseconds>(end - start);
std::cout << "避免伪共享耗时:" << duration.count () << "ms" << std::endl;
}
int main () {
falseSharingTest ();
return 0 ;
}
八、调试与测试
8.1 线程安全的单元测试 #include <iostream>
#include <thread>
#include <vector>
#include <cassert>
#include <future>
class ThreadSafeCounter {
private :
std::mutex mtx;
int count;
public :
ThreadSafeCounter () : count (0 ) {}
void increment () {
std::lock_guard<std::mutex> lock (mtx) ;
++count;
}
int get () const {
std::lock_guard<std::mutex> lock (mtx) ;
return count;
}
};
void testThreadSafety () {
ThreadSafeCounter counter;
const int numThreads = 10 ;
const int incrementsPerThread = 1000 ;
std::vector<std::future<void >> futures;
for (int i = 0 ; i < numThreads; ++i) {
futures.emplace_back (std::async (std::launch::async, [&counter, incrementsPerThread] {
for (int j = 0 ; j < incrementsPerThread; ++j) {
counter.increment ();
}
}));
}
for (auto & future : futures) {
future.wait ();
}
int expected = numThreads * incrementsPerThread;
int actual = counter.get ();
std::cout << "期望值:" << expected << std::endl;
std::cout << "实际值:" << actual << std::endl;
assert (actual == expected);
std::cout << "测试通过!" << std::endl;
}
int main () {
try {
testThreadSafety ();
} catch (const std::exception& e) {
std::cerr << "测试失败:" << e.what () << std::endl;
return 1 ;
}
return 0 ;
}
九、总结与进阶学习
9.1 关键点总结
线程创建与管理 :使用 std::thread 创建线程,理解 join 和 detach 的区别
数据同步 :掌握互斥锁、条件变量、原子操作的使用场景
避免常见问题 :识别和避免死锁、竞态条件、伪共享
性能优化 :合理选择同步机制,减少锁竞争
现代 C++ 特性 :利用 C++14/17/20 的新特性简化多线程编程
9.2 进阶学习方向
并行算法 :C++17 的并行 STL 算法
GPU 编程 :CUDA、OpenCL、SYCL
分布式计算 :MPI、gRPC、ZeroMQ
异步编程模型 :Promise/Future、反应式编程
并发数据结构 :无锁队列、并发哈希表
9.3 推荐工具
调试工具 :ThreadSanitizer、Helgrind、Intel Inspector
性能分析 :Perf、VTune、AMD uProf
可视化 :Chrome Tracing、Vampir
多线程编程是 C++ 高级开发的核心技能之一。从基础同步机制到高级并发模式,需要不断实践和积累经验。记住:过早优化是万恶之源 ,在确保正确性的前提下进行性能优化,使用工具验证线程安全性,编写可维护的并发代码。
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
JSON美化和格式化 将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online