跳到主要内容C/C++ 线程池详解 | 极客日志C++算法
C/C++ 线程池详解
本文深入解析了 C++ 线程池的设计与实现。涵盖线程池概念、架构组成、任务提交与执行流程、工作线程生命周期管理及优雅关闭机制。重点阐述了条件变量等待中的虚假唤醒处理、锁作用域对并发性能的影响、析构函数中的死锁规避策略以及 Lambda 表达式中 shared_ptr 的正确捕获方式。文末提供了基于 C++11 标准库的完整可运行代码示例,总结了使用谓词等待、释放锁后执行任务等关键最佳实践,适用于高并发场景下的资源管理与任务调度优化。
星河入梦1 浏览 线程池详解 (Thread Pool Deep Dive)
什么是线程池?(What is a Thread Pool?)
线程池是一种多线程处理模式,它预先创建一定数量的线程,将任务放入队列中,由空闲的线程从队列中取出任务并执行。
为什么需要线程池?
频繁创建和销毁线程的开销很大。线程池通过复用已创建的线程来避免这种开销,同时可以控制并发线程的数量,防止系统资源耗尽。
架构概览 (Architecture Overview)
┌─────────────────────────────────────────────────────────────────────────────┐
│ ThreadPool │
│ │
│ ┌─────────────┐ ┌─────────────────────────────────┐ │
│ │ Main Thread │ │ Queue │ │
│ │ │ │ ┌──────┬──────┬──────┬──────┐ │ │
│ │ submit │──▶│ │ │ │ │ │ │ │
│ │ pool.(f) │ │ │ │ │ │ │ │
│ │ │ │ └──────┴──────┴──────┴──────┘ │ │
│ └─────────────┘ └───────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────┐ │
│ │ Worker Threads │ │
│ │ │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ W1 │ │ W2 │ │ W3 │ ... │ │ │
│ │ └─────┘ └─────┘ └─────┘ │ │
│ └───────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Synchronization Primitives │ │
│ │ std::mutex mutex_ std::condition_variable cv_ │ │
│ │ (保护队列访问) (线程间通信) │ │
│ │ (Protects queue) (Thread communication) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Task
Task
Task
Task
Task
submit
1
2
3
4
核心组件 (Core Components)
┌────────────────────────────────────────────────────────────────┐
│ ThreadPool 成员变量 │
│ (Member Variables) │
├────────────────────────────────────────────────────────────────┤
│ │
│ workers_ 任务队列 同步原语 │
│ 工作线程 tasks_ mutex_ + cv_ │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌─────────────────┐ │
│ │ thread │ │ std::queue │ │ std::mutex │ │
│ │ thread │ │ <function │ │ │ │
│ │ thread │ │ <void()>> │ │ std::condition │ │
│ │ thread │ │ │ │ _variable │ │
│ └─────────┘ └─────────────┘ └─────────────────┘ │
│ │
│ stop_ │
│ 停止标志 (bool) │
└────────────────────────────────────────────────────────────────┘
任务提交流程 (Task Submit Flow)
Main Thread ThreadPool
│
│ pool.submit(task)
│───────────────────────────────────────▶│
│
│ ┌─────────────┴─────────────┐
│ 1. 获取 mutex_ 锁 │
│ Acquire mutex_ lock │
│ ├───────────────────────────┤
│ 2. 检查 stop_ 标志 │
│ Check stop_ flag │
│ ├───────────────────────────┤
│ 3. 任务入队 │
│ Push task to queue │
│ ├───────────────────────────┤
│ 4. 释放锁 │
│ Release lock │
│ ├───────────────────────────┤
│ 5. cv_.notify_one() │
│ Wake one worker │
│ └─────────────┬─────────────┘
│ │◀───────────────────────────────────────│
│ return
工作线程生命周期 (Worker Thread Lifecycle)
┌──────────────────┐
│ Start │
│ 线程启动 │
└────────┬─────────┘
▼
┌──────────────────────────────┐
│ 获取锁 (Acquire Lock) │
└──────────────┬───────────────┘
▼
┌──────────────────────────────┐
│ cv_.wait(lock, predicate) │◀─────────────┐
│ │ │
│ 等待条件: │ │
│ stop_ || !tasks_.empty() │ │
│ │ │
│ Wait for: │ │
│ stop OR queue not empty │ │
└──────────────┬───────────────┘
│ │
│ (被唤醒 woken up)
▼
┌──────────────────────────────┐
│ stop_ && tasks_.empty() ? │
└──────────────┬───────────────┘
│
│
┌────────────┴────────────┐
│ YES │ NO
▼ ▼
┌───────────────────┐ ┌──────────────────────────┐
│ return │ │ 取出任务 │
│ 线程退出 │ │ Fetch task from queue │
└───────────────────┘ └────────────┬─────────────┘
│ │
▼ ▼
┌──────────────────────────┐ ┌──────────────────────────┐
│ 释放锁 │ │ 执行任务 (无锁) │
│ Release lock │ │ Execute task (no lock) │
└────────────┬─────────────┘ └────────────┬─────────────┘
│ │
└──────────────────────────┘
关闭流程 (Shutdown Flow)
Main Thread Worker Threads
│
│ ~ThreadPool()
│ (waiting on cv_)
│
▼
┌─────────────────┐
│ 获取锁 │
│ Acquire lock │
└───────┬─────────┘
│
▼
┌─────────────────┐
│ stop_ = true │
└───────┬─────────┘
│
▼
┌─────────────────┐
│ 释放锁 │
│ Release lock │
└───────┬─────────┘
│
▼
┌─────────────────┐ cv_.notify_all()
│ 唤醒所有线程 │ ──────────────────────▶
│ Wake all │
└───────┬─────────┘
│
▼
┌─────────────────┐
│ 检查条件 │
│ Check condition │
│ │
│ stop_=true │
│ queue empty? │
└────────┬────────┘
│
▼
┌─────────────────┐
│ 是 → 退出 │
│ Yes → return │
│ │
│ 否 → 处理剩余 │
│ No → process │
│ remaining tasks │
└────────┬────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐
│ join() 所有线程 │◀───────────────│ 线程结束
│ Wait for all │ │ Thread exits │
└───────┬─────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ 析构完成 │
│ Destructor done │
└─────────────────┘
条件变量等待机制 (Condition Variable Wait Mechanism)
┌───────────────────────────────────────────────────────────────────────────┐
│ │
│ cv_.wait(lock, predicate) 内部实现等价于: │
│ Internally equivalent to: │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ while (!predicate()) {
│ │ cv_.wait(lock);
│ │ } │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ├───────────────────────────────────────────────────────────────────────────┤
│ │ 为什么需要循环? (Why loop?) │
│ │ │
│ │ 情况 1: 虚假唤醒 (Spurious Wakeup) │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │ │
│ │ │ Thread A: waiting... → (spurious wakeup!) → 没有任务! │ │
│ │ │ → No task! │ │
│ │ │ │ │
│ │ │ 如果用 if: 会尝试访问空队列 → 崩溃! │ │
│ │ │ With if: would access empty queue → crash! │ │
│ │ │ │ │
│ │ │ 如果用 while: 重新检查条件 → 继续等待 │ │
│ │ │ With while: recheck condition → continue waiting │ │
│ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ 情况 2: 任务被抢走 (Stolen Task) │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │ │
│ │ │ Time Thread A Thread B Queue │ │
│ │ │ ──── ──────── ──────── ───── │ │
│ │ │ 1 waiting waiting empty │ │
│ │ │ 2 (notified) (also wakes) [task] │ │
│ │ │ 3 gets lock waiting [task] │ │
│ │ │ 4 takes task gets lock empty │ │
│ │ │ 5 working queue empty! empty │ │
│ │ │ │ │
│ │ │ Thread B 必须重新检查,否则访问空队列 │ │
│ │ │ Thread B must recheck or it accesses empty queue │ │
│ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ └───────────────────────────────────────────────────────────────────────────┘
锁的作用域问题 (Lock Scope Issues)
正确 vs 错误 (Correct vs Wrong)
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ ❌ 错误 (WRONG) │ │ ✅ 正确 (CORRECT) │
├─────────────────────────────────┤ ├─────────────────────────────────┤
│ │ │ │
│ void worker() { │ │ void worker() { │
│ while (true) { │ │ while (true) { │
│ unique_lock lk(mutex_); │ │ function<void()> task; │
│ cv_.wait(lk, ...); │ │ { │
│ task = move(queue.front); │ │ unique_lock lk(mutex_); │
│ queue.pop(); │ │ cv_.wait(lk, ...); │
│ } ← 锁在此释放 │ │ task(); ← 持有锁执行! │
│ (Lock released here) │ │ (Hold lock!) │
│ task(); ← 无锁执行 │ │ } │
│ (No lock held) │ │ task(); ← 无锁执行 │
│ } │ │ (No lock held) │
│ } │ │ } │
└─────────────────────────────────┘ └─────────────────────────────────┘
│ │ │ │
│ ▼ │ │ ▼ │
│ ┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ │ 结果:所有任务串行执行 │ │ 结果:任务真正并行执行 │
│ │ Result: All tasks serialized │ │ Result: True parallel execution │
│ │ │ │ │
│ │ W1: ████████ │ │ W1: ████ │
│ │ W2: ████████ │ │ W2: ████ │
│ │ W3: ████████ │ │ W3: ████ │
│ │ W4: ████ W4: ████ │ │ W4: ████ │
│ │ │ │ │
│ │ 时间 ──────────────────────▶ │ │ 时间 ────▶ │
└─────────────────────────────────┘ └─────────────────────────────────┘
析构函数死锁问题 (Destructor Deadlock)
┌───────────────────────────────────────────────────────────────────────────┐
│ ❌ 死锁场景 (Deadlock) │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ Main Thread Worker Thread │
│ │
│ │
│ ▼ │
│ ┌─────────────┐ │
│ │ lock(mutex) │ │
│ └──────┬──────┘ │
│ │ │
│ │ ▼ ▼ │
│ │ ┌─────────────┐ ┌─────────────┐ │
│ │ │ stop = true │ │ want lock │ │
│ │ └──────┬──────┘ │ 想要获取锁 │ │
│ │ └──────┬──┘ └─────────────┘ │
│ │ ▼ │
│ │ ┌─────────────┐ │
│ │ │ join() │ ─────等待─────┐ │
│ │ │ 等待线程结束 │ │
│ │ └─────────────┘ │
│ │ │
│ │ │
│ │ ▼ ▼ │
│ │ ┌─────────────────────────┐ │
│ │ │ │
│ │ │ 永远等待 (Forever) │
│ │ │ 💀 DEADLOCK 💀 │
│ │ │ │
│ │ └─────────────────────────┘ │
│ └───────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────────┐
│ ✅ 正确做法 (Correct) │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ ~ThreadPool() { │
│ { ◀─── 作用域块 │
│ Scoped block │
│ lock_guard lk(mutex); │
│ stop_ = true; │
│ } ◀─── 锁在此释放 │
│ Lock released here │
│ cv_.notify_all(); │
│ for (auto& t : workers_) │
│ t.join(); ◀─── 此时不持有锁 │
│ } No lock held here │
│ │
└───────────────────────────────────────────────────────────────────────────┘
shared_ptr 捕获机制 (shared_ptr Capture in Lambda)
┌───────────────────────────────────────────────────────────────────────────┐
│ submitWithFuture() 内存管理 │
│ Memory Management │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ auto task = make_shared<packaged_task<T()>>(f); │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ task (shared_ptr) │ │
│ │ │ │
│ │ ref_count = 1 │ │
│ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ packaged_task │ │ │
│ │ │ on heap │ │ │
│ │ └─────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ tasks_.push([task]() { (*task)(); }); ◀─── 按值捕获 │
│ Capture by value │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ task (local) lambda (in queue) │ │
│ │ │ │
│ │ ref_count = 2 │ │
│ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ packaged_task │ │ │
│ │ │ on heap │ │ │
│ │ └─────────────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ return fut;
│
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ lambda (in queue) │ │
│ │ │ │
│ │ ref_count = 1 ◀─── 仍然存活! │ │
│ │ Still alive! │ │
│ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ packaged_task │ │ │
│ │ └─────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ ├───────────────────────────────────────────────────────────────────────────┤
│ │ ❌ 如果用引用捕获 [&task]: │
│ │ If captured by reference: │
│ │ │
│ │ submit() 返回 → task 销毁 → lambda 持有悬空指针 → 💀 崩溃 │
│ │ submit returns → task destroyed → lambda has dangling ptr → crash │
│ │
│ └───────────────────────────────────────────────────────────────────────────┘
完整实现 (Complete Implementation)
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <stdexcept>
class ThreadPool {
public:
explicit ThreadPool(size_t n) : stop_(false) {
for (size_t i = 0; i < n; i++) {
workers_.emplace_back([this]{ worker(); });
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
~ThreadPool() {
{
std::lock_guard<std::mutex> lk(mutex_);
stop_ = true;
}
cv_.notify_all();
for (auto& t : workers_) {
t.join();
}
}
void submit(std::function<void()> task) {
{
std::lock_guard<std::mutex> lk(mutex_);
if (stop_) {
throw std::runtime_error("submit on stopped ThreadPool");
}
tasks_.push(std::move(task));
}
cv_.notify_one();
}
template<typename F>
auto submitWithFuture(F&& f) -> std::future<decltype(f())> {
using ReturnType = decltype(f());
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::forward<F>(f));
std::future<ReturnType> fut = task->get_future();
{
std::lock_guard<std::mutex> lk(mutex_);
if (stop_) {
throw std::runtime_error("submit on stopped ThreadPool");
}
tasks_.push([task]() { (*task)(); });
}
cv_.notify_one();
return fut;
}
private:
void worker() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mutex_);
cv_.wait(lk, [this]{ return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
}
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cv_;
bool stop_;
};
使用示例 (Usage Example)
#include <iostream>
#include <chrono>
int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; i++) {
pool.submit([i]{
std::cout << "Task " << i << " running on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
}
auto future1 = pool.submitWithFuture([]{ return 42; });
auto future2 = pool.submitWithFuture([]{ return std::string("hello from thread pool"); });
std::cout << "Result 1: " << future1.get() << std::endl;
std::cout << "Result 2: " << future2.get() << std::endl;
return 0;
}
总结 (Summary)
┌───────────────────────────────────────────────────────────────────────────┐
│ 关键要点 (Key Takeaways) │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ ✅ 使用 while 循环或谓词等待条件变量 │
│ Use while loop or predicate for condition variable wait │
│ │
│ ✅ 执行任务前释放锁,允许真正的并行 │
│ Release lock before executing task for true parallelism │
│ │
│ ✅ 析构时先释放锁,再 join 线程 │
│ Release lock before joining threads in destructor │
│ │
│ ✅ Lambda 捕获 shared_ptr 时用值捕获,避免悬空引用 │
│ Capture shared_ptr by value in lambda to avoid dangling reference │
│ │
│ ✅ 检查停止标志,优雅处理关闭后的提交请求 │
│ Check stop flag, gracefully handle submissions after shutdown │
│ │
│ ✅ 禁止拷贝构造和赋值 │
│ Disable copy constructor and assignment │
│ │
└───────────────────────────────────────────────────────────────────────────┘
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown 转 HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
- HTML 转 Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online