实现线程池
线程池的核心思路与进程池类似,包含一个任务队列和若干工作线程。用户向任务队列添加任务,工作线程从队列中获取并执行。
ThreadPool 类设计
构造函数
启动线程池分为两步:创建线程对象和启动线程。在构造函数中创建 Thread 对象,传递回调函数和线程名。由于成员函数默认包含 this 指针,而封装的 Thread 类回调设计为无参无返回值,需使用 Lambda 表达式或包装器解决参数匹配问题。
ThreadPool(int threadnum = default_threadnum) : _is_running(false), _threadnum(threadnum)
{
for (int i = 0; i < _threadnum; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back([this](const std::string &name){ this->Routine(name); }, name);
}
}
Start 接口
构造函数创建线程完成后,调用 Start 接口启动线程,防止重复启动。
void Start()
{
if (_is_running) return;
_is_running = true;
for (auto &t : _threads)
{
t.Start();
}
}
线程池接入日志
在线程创建、启动、等待、回收等关键步骤输出日志,便于调试。
初步实现源码及效果图
ThreadPool.hpp
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <unistd.h>
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
const static int default_threadnum = 3;
template<typename T>
class ThreadPool
{
private:
void Routine(const std::string &name)
{
while (1)
{
T t;
{
LockGuard lockguard(&_lock);
while (QueueIsEmpty() && _is_running)
{
_wait_thread_num++;
_cond.Wait(_lock);
_wait_thread_num--;
}
if (QueueIsEmpty() && !_is_running)
{
break;
}
t = _q.front();
_q.pop();
}
t();
}
}
public:
ThreadPool(int threadnum = default_threadnum) : _is_running(false), _threadnum(threadnum), _wait_thread_num(0)
{
for (int i = 0; i < _threadnum; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back([this](const std::string &name){ this->Routine(name); }, name);
}
}
void Start()
{
if (_is_running) return;
_is_running = true;
for (auto &t : _threads)
{
t.Start();
}
}
void Stop()
{
if (!_is_running) return;
_is_running = false;
if (_wait_thread_num)
{
_cond.NotifyAll();
}
}
void Wait()
{
for (auto &t : _threads)
{
t.Join();
}
}
void Enqueue(const T &t)
{
if (!_is_running) return;
{
LockGuard lockguard(&_lock);
_q.push(t);
if (_wait_thread_num > 0)
{
_cond.NotifyOne();
}
}
}
~ThreadPool() {}
private:
std::queue<T> _q;
std::vector<Thread> _threads;
int _threadnum;
int _wait_thread_num;
Mutex _lock;
Cond _cond;
bool _is_running;
bool QueueIsEmpty()
{
return _q.empty();
}
};
main.cpp
#include "ThreadPool.hpp"
#include <memory>
int main()
{
std::unique_ptr<ThreadPool<int>> tp = std::make_unique<ThreadPool<int>>(5);
tp->Start();
sleep(5);
tp->Stop();
tp->Wait();
return 0;
}
代码执行逻辑分析
- 构造阶段:仅创建
Thread 对象,不启动线程,不执行回调。
- 启动阶段:
ThreadPool::Start() 调用 Thread::Start(),底层触发 pthread_create。
- 回调执行:
pthread_create 传入当前 Thread 对象指针,pthread_routine 将其强转为 Thread*,调用存储的 Lambda 函数,最终执行 ThreadPool::Routine。
实现回调函数 Routine
真正的回调函数应从任务队列获取任务并消费。访问临界资源(如任务队列)需加锁,但任务执行本身无需加锁。
void Routine(const std::string &name)
{
while (1)
{
T t;
{
LockGuard lockguard(&_lock);
while (QueueIsEmpty() && _is_running)
{
_wait_thread_num++;
_cond.Wait(_lock);
_wait_thread_num--;
}
if (QueueIsEmpty() && !_is_running)
{
break;
}
t = _q.front();
_q.pop();
}
t();
}
}
enqueue 接口实现
当用户往队列放入任务后,需唤醒一个休眠线程去消费。通过 _wait_thread_num 记录休眠线程数量,唤醒时通知条件变量。
void Enqueue(const T &t)
{
if (!_is_running) return;
{
LockGuard lockguard(&_lock);
_q.push(t);
if (_wait_thread_num > 0)
{
_cond.NotifyOne();
}
}
}
线程池退出 stop 接口优化
直接强制退出可能导致线程正在处理任务或卡在等待状态。优化方案是设置 _is_running = false 并唤醒所有休眠线程,让线程自行判断退出条件。
void Stop()
{
if (!_is_running) return;
_is_running = false;
if (_wait_thread_num)
{
_cond.NotifyAll();
}
}
线程安全和重入问题
线程安全:多个线程访问共享资源时能正确执行,互不干扰。通常涉及全局变量或静态变量且无锁保护时易出问题。
重入:同一函数被不同执行流调用,前一个流程未结束,另一个再次进入。若结果不受影响则为可重入函数。
结论
- 可重入函数一定是线程安全的。
- 线程安全不一定是可重入的(例如单进程响应信号场景下可能产生死锁)。
- 含有全局变量的函数既不是线程安全也不是可重入的。
- 对临界资源加锁可使函数线程安全,但若锁未释放即重入则会产生死锁,导致不可重入。
死锁
死锁是指一组进程中各进程均占有不会释放的资源,因互相申请被其他进程占用的资源而处于永久等待状态。
死锁四个必要条件
- 互斥条件:资源每次只能被一个执行流使用。
- 请求与保持条件:执行流因请求资源阻塞时,对已获资源保持不放。
- 不剥夺条件:已获得资源在使用完之前不能被强行剥夺。
- 循环等待条件:若干执行流之间形成头尾相接的循环等待关系。
避免死锁
核心思想是破坏上述一个或多个必要条件:
- 破坏请求与保持条件:使用
pthread_mutex_trylock,申请锁失败后不阻塞而是返回错误码,释放已持锁重新申请。
- 破坏不剥夺条件:设计仲裁函数,优先级较低的线程主动释放锁。
- 预防循环等待:保证资源一次性分配,使用超时机制,或统一加锁顺序。
STL、智能指针和线程安全
STL 容器:默认不是线程安全的。设计初衷是极致性能,加锁会严重影响性能。多线程环境下需调用者自行保证安全。
智能指针:
unique_ptr:作用域内生效,不涉及线程安全问题。
shared_ptr:引用计数变量存在线程安全问题,但标准库基于原子操作(CAS)保证引用计数的原子性,因此是线程安全的。