C++ 多线程与并发系统取向(六)—— 任务系统骨架:mutex + condition_variable + atomic 组合实战
一、目标:做一个“可用”的最小任务系统
我们要实现一个简化版任务系统,支持:
- ✅ 提交任务(任意函数)
- ✅ 工作线程自动执行
- ✅ 无任务时阻塞等待
- ✅ 支持安全关闭
- ✅ 正确释放线程
你可以理解为:
线程池的前身。
二、设计思路(系统取向)
我们需要三类能力:
1️⃣ 资源保护(队列)
- 任务队列
- 多线程访问
- 必须用 mutex 保护
2️⃣ 线程协作(等待任务)
- 队列空 → 线程休眠
- 新任务 → 唤醒线程
- 使用 condition_variable
3️⃣ 状态控制(停止系统)
- 停止标志
- 使用 atomic<bool>
三、整体结构图
主线程 │ ├── submit() → push 任务 → notify │ ▼ 工作线程 │ ├── wait 等待任务 ├── 取任务 ├── unlock ├── 执行 └── 循环四、完整实现(可直接运行)
1️⃣ 头文件
#include <iostream> #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable> #include <atomic> #include <functional>2️⃣ 任务系统类
class TaskSystem { public: TaskSystem(size_t threadCount) : running_(true) { for (size_t i = 0; i < threadCount; ++i) { workers_.emplace_back([this] { this->workerLoop(); }); } } ~TaskSystem() { shutdown(); } // 提交任务 void submit(std::function<void()> task) { { std::unique_lock<std::mutex> lock(mtx_); tasks_.push(task); } cv_.notify_one(); } // 关闭系统 void shutdown() { running_.store(false); cv_.notify_all(); for (auto& t : workers_) { if (t.joinable()) { t.join(); } } } private: void workerLoop() { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(mtx_); cv_.wait(lock, [this] { return !tasks_.empty() || !running_.load(); }); if (!running_.load() && tasks_.empty()) { return; } task = tasks_.front(); tasks_.pop(); } task(); // 在锁外执行任务 } } private: std::vector<std::thread> workers_; std::queue<std::function<void()>> tasks_; std::mutex mtx_; std::condition_variable cv_; std::atomic<bool> running_; };五、测试代码
int main() { TaskSystem pool(3); for (int i = 0; i < 10; ++i) { pool.submit([i] { std::cout << "Task " << i << " running in thread " << std::this_thread::get_id() << std::endl; }); } std::this_thread::sleep_for(std::chrono::seconds(1)); pool.shutdown(); return 0; }输出类似:
Task 0 running in thread 1234 Task 1 running in thread 5678 ...六、关键点逐条拆解(非常重要)
1️⃣ 为什么 running_ 用 atomic?
因为:
- 多线程读写
- 只控制“状态”
- 不涉及复杂资源
符合 atomic 使用场景。
2️⃣ 为什么任务执行必须在锁外?
如果这样写:
task();放在锁内:
- 所有线程会被阻塞
- 并发能力消失
原则:
锁保护资源,不保护业务逻辑
3️⃣ 为什么 wait 要带 predicate?
cv_.wait(lock, condition);防止:
- 假唤醒
- 丢通知
4️⃣ 为什么 shutdown 要 notify_all?
因为:
- 可能多个线程在等待
- 必须全部唤醒退出
七、Java 类比
Java 线程池:
ExecutorService pool = Executors.newFixedThreadPool(3); pool.submit(() -> {}); pool.shutdown();Java 帮你封装了:
- 任务队列
- 锁
- 条件变量
- 状态控制
C++:
你自己写。
这就是 C++ 并发“系统级能力”的核心差异。
八、系统层面你已经掌握了什么?
现在你已经具备:
✔ 线程模型理解
谁共享数据?
✔ 资源保护
mutex + RAII
✔ 线程协作
condition_variable
✔ 状态控制
atomic
✔ 生命周期管理
join + shutdown
九、工程升级方向(后续可以做)
- 支持有界队列
- 支持优先级任务
- 支持 future
- 支持返回值
- 支持任务取消
十、本篇总结口诀
队列用 mutex
等待用 cv
状态用 atomic
生命周期用 join
十一、下一篇(最终篇)
第七篇我们讲:
并发排障与工程纪律
- 死锁四条件
- 锁顺序策略
- 竞态如何定位
- 性能如何优化
- 锁粒度怎么拆
- 工程规范 checklist
这才是“系统取向”的终章。