跳到主要内容 Linux 环境下 C++ 线程池设计与实现 | 极客日志
C++ 算法
Linux 环境下 C++ 线程池设计与实现 线程池通过复用线程减少创建销毁开销,适用于大量短任务场景。基于 C++ 和 pthread 实现了线程池核心逻辑,包含任务队列、互斥锁及条件变量管理。同时探讨了单例模式在资源管理中的应用及其线程安全改造,分析了线程安全与可重入性的区别,并详细阐述了死锁成因(互斥、请求保持、不剥夺、循环等待)及避免策略(破坏必要条件、统一加锁顺序等)。
刀狂 发布于 2026/2/8 更新于 2026/4/18 5K 浏览Linux 线程池设计与实现
一、线程池概述
1. 线程池的认识
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程时的代价。线程池不仅可以保证内核的充分利用,还能防止过分调度。
线程池的应用场景:
需要大量的线程完成任务,且完成任务的时间比较短。比如 WEB 服务器完成网页请求这样的任务。
对性能要求苛刻的应用,比如:要求服务器迅速响应客户请求。
2. 线程池的实现
ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <memory>
#include <time.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Logger.hpp"
#include "Task.hpp"
const int default_thread_num = 5 ;
template <typename T> class ThreadPool {
:
{ _q. (); }
{
( ) {
T t; {
;
( () && _is_running) {
_wait_thread_num++;
_cond. (_lock);
_wait_thread_num--;
}
(!_is_running && ()) {
(LoggerLevel::INFO) << << name << ;
;
}
t = _q. ();
_q. ();
}
();
(LoggerLevel::DEBUG) << name << << t. ();
}
}
:
( thread_num = default_thread_num)
: _thread_num(thread_num), _is_running( ), _wait_thread_num( ) {
( i = ; i < _thread_num; ++i) {
std::string name = + std:: (i + );
_threads. ([ ]( std::string &_name) {
-> (_name);
}, name);
}
(LoggerLevel::INFO) << ;
}
{
(_is_running) ;
_is_running = ;
( &t : _threads) {
t. ();
}
}
{
(!_is_running) ;
_is_running = ;
(_wait_thread_num > ) _cond. ();
(LoggerLevel::INFO) << ;
}
{
( &t : _threads) {
t. ();
}
(LoggerLevel::INFO) << ;
}
{
(!_is_running) ;
{
;
_q. (t);
(_wait_thread_num > ) _cond. ();
}
}
~ () {}
:
std::queue<T> _q;
std::vector<Thread> _threads;
_thread_num;
Mutex _lock;
Cond _cond;
_is_running;
_wait_thread_num;
};
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
private
bool QueueIsEmpty ()
return
empty
void Routine (const std::string &name)
while
true
LockGuard lockguard (&_lock)
while
QueueIsEmpty
Wait
if
QueueIsEmpty
LOG
"线程池退出 && 任务队列为空,"
"退出"
break
front
pop
t
LOG
" handler task, "
ResultToString
public
ThreadPool
int
false
0
for
int
0
"thread-"
to_string
1
emplace_back
this
const
this
Routine
LOG
"ThreadPool obj create success"
void Start ()
if
return
true
for
auto
Start
void Stop ()
if
return
false
if
0
NotifyAll
LOG
"thread pool stop success"
void Wait ()
for
auto
Join
LOG
"thread pool wait success"
void EnQueue (const T &t)
if
return
LockGuard lockguard (&_lock)
push
if
0
NotifyOne
ThreadPool
private
int
bool
int
Task.hpp #pragma once
#include <iostream>
#include <sstream>
#include <functional>
#include <unistd.h>
class Task {
public :
Task () {}
Task (int x, int y) : a (x), b (y) {}
void Excute () { result = a + b; }
void operator () () {
Excute ();
}
std::string ResultToString () {
std::stringstream ss;
ss << a << " + " << b << " = " << result;
return ss.str ();
}
private :
int a;
int b;
int result;
};
Mutex.hpp #pragma once
#include <iostream>
#include <mutex>
#include <pthread.h>
class Mutex {
public :
Mutex () { pthread_mutex_init (&_lock, nullptr ); }
void Lock () { pthread_mutex_lock (&_lock); }
pthread_mutex_t * Get () { return &_lock; }
void Unlock () { pthread_mutex_unlock (&_lock); }
~Mutex () { pthread_mutex_destroy (&_lock); }
private :
pthread_mutex_t _lock;
};
class LockGuard {
public :
LockGuard (Mutex* _mutex) : _mutexp(_mutex) { _mutexp->Lock (); }
~LockGuard () { _mutexp->Unlock (); }
private :
Mutex* _mutexp;
};
Cond.hpp #include "Mutex.hpp"
class Cond {
public :
Cond () { pthread_cond_init (&_cond, nullptr ); }
void Wait (Mutex& lock) { pthread_cond_wait (&_cond, lock.Get ()); }
void NotifyOne () { pthread_cond_signal (&_cond); }
void NotifyAll () { pthread_cond_broadcast (&_cond); }
~Cond () { pthread_cond_destroy (&_cond); }
private :
pthread_cond_t _cond;
};
Thread.hpp #ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <pthread.h>
#include <unistd.h>
#include <sys/syscall.h>
#include "Logger.hpp"
#define get_lwp_id() syscall(SYS_gettid)
using func_t = std::function<void (const std::string& name)>;
std::string thread_name_default = "None_Name" ;
class Thread {
public :
Thread (func_t func, std::string name = thread_name_default)
: _isrunning(false ), _name(name), _func(func) {}
static void * start_routine (void * args) {
Thread* self = static_cast <Thread*>(args);
self->_isrunning = true ;
self->_lwpid = get_lwp_id ();
self->_func(self->_name);
pthread_exit ((void *)0 );
}
void Start () {
int n = pthread_create (&_tid, nullptr , start_routine, this );
if (n == 0 ) {
LOG (LoggerLevel::INFO) << "pthread_create success" ;
}
}
void Stop () {
int n = pthread_cancel (_tid);
LOG (LoggerLevel::INFO) << "thread cancel success" ;
(void )n;
}
void Join () {
if (!_isrunning) return ;
int n = pthread_join (_tid, nullptr );
if (n == 0 ) {
LOG (LoggerLevel::INFO) << "pthread_join success" ;
}
}
~Thread () {}
private :
bool _isrunning;
pthread_t _tid;
pid_t _lwpid;
std::string _name;
func_t _func;
};
#endif
Logger.hpp #pragma once
#include <iostream>
#include <filesystem>
#include <fstream>
#include <string>
#include <sstream>
#include <memory>
#include <unistd.h>
#include "Mutex.hpp"
enum class LoggerLevel { DEBUG, INFO, WARNING, ERROR, FATAL };
std::string LoggerLevelToString (LoggerLevel level) {
switch (level) {
case LoggerLevel::DEBUG: return "Debug" ;
case LoggerLevel::INFO: return "Info" ;
case LoggerLevel::WARNING: return "Warning" ;
case LoggerLevel::ERROR: return "Error" ;
case LoggerLevel::FATAL: return "Fatal" ;
default : return "Unknown" ;
}
}
std::string GetCurrentTime () {
time_t timep = time (nullptr );
struct tm currtm;
localtime_r (&timep, &currtm);
char buffer[64 ];
snprintf (buffer, sizeof (buffer), "%4d-%02d-%02d %02d-%02d-%02d" , currtm.tm_year + 1900 , currtm.tm_mon + 1 , currtm.tm_mday, currtm.tm_hour, currtm.tm_min, currtm.tm_sec);
return buffer;
}
class LogStrategy {
public :
virtual ~LogStrategy () = default ;
virtual void SyncLog (const std::string &logmessage) = 0 ;
};
class ConsoleLogStrategy : public LogStrategy {
public :
~ConsoleLogStrategy () {}
virtual void SyncLog (const std::string &logmessage) override {
LockGuard lockguard (&_lock) ;
std::cout << logmessage << std::endl;
}
private :
Mutex _lock;
};
const std::string default_dir_path_name = "log" ;
const std::string default_filename = "test.log" ;
class FileLogStrategy : public LogStrategy {
public :
FileLogStrategy (const std::string dir_path_name = default_dir_path_name,
const std::string filename = default_filename)
: _dir_path_name(dir_path_name), _filename(filename) {
if (std::filesystem::exists (_dir_path_name)) {
return ;
}
try {
std::filesystem::create_directories (_dir_path_name);
} catch (const std::filesystem::filesystem_error &e) {
std::cerr << e.what () << "\r\n" ;
}
}
~FileLogStrategy () {}
virtual void SyncLog (const std::string &logmessage) override {
LockGuard lock (&_lock) ;
std::string target = _dir_path_name;
target += '/' ;
target += _filename;
std::ofstream out (target.c_str(), std::ios::app) ;
if (!out.is_open ()) {
return ;
}
out << logmessage << "\n" ;
out.close ();
}
private :
std::string _dir_path_name;
std::string _filename;
Mutex _lock;
};
class Logger {
public :
Logger () {}
void EnableConsoleStrategy () { _strategy = std::make_unique <ConsoleLogStrategy>(); }
void EnableFileStrategy () { _strategy = std::make_unique <FileLogStrategy>(); }
class LogMessage {
public :
LogMessage (LoggerLevel level, std::string filename, int line, Logger& logger)
: _curr_time(GetCurrentTime ()), _level(level), _pid(getpid ()), _filename(filename), _line(line), _logger(logger) {
std::stringstream ss;
ss << "[" << _curr_time << "] "
<< "[" << LoggerLevelToString (_level) << "] "
<< "[" << _pid << "] "
<< "[" << _filename << "] "
<< "[" << _line << "]" << " - " ;
_loginfo = ss.str ();
}
template <typename T>
LogMessage &operator <<(const T &info) {
std::stringstream ss;
ss << info;
_loginfo += ss.str ();
return *this ;
}
~LogMessage () {
if (_logger._strategy) {
_logger._strategy->SyncLog (_loginfo);
}
}
private :
std::string _curr_time;
LoggerLevel _level;
pid_t _pid;
std::string _filename;
int _line;
std::string _loginfo;
Logger &_logger;
};
LogMessage operator () (LoggerLevel level, std::string filename, int line) {
return LogMessage (level, filename, line, *this );
}
~Logger () {}
private :
std::unique_ptr<LogStrategy> _strategy;
};
Logger logger;
#define LOG(level) logger(level, __FILE__, __LINE__)
#define EnableConsoleStrategy() logger.EnableConsoleStrategy()
#define EnableFileStrategy() logger.EnableFileStrategy()
main.cc #include "ThreadPool.hpp"
int main () {
srand ((unsigned int )time (nullptr ));
EnableConsoleStrategy ();
std::unique_ptr<ThreadPool<Task>> tq = std::make_unique<ThreadPool<Task>>(10 );
tq->Start ();
int cnt = 10 ;
while (cnt--) {
int x = rand () % 10 + 1 ;
int y = rand () % 5 + 1 ;
Task t (x, y) ;
tq->EnQueue (t);
sleep (1 );
}
tq->Stop ();
tq->Wait ();
return 0 ;
}
Makefile threadpool: main.cc
g++ -o $@ $^ -std=c++17 -lpthread
.PHONY : clean
clean:
rm -f threadpool
二、单例模式 在很多的服务器开发场景中,经常需要让服务器加载很多的数据(上百 G)到内存中,此时往往需要一个单例的类来管理这些数据。
所谓饿汉方式就是吃完饭,立刻洗碗,这就是饿汉方式,因为下一顿吃的时候就可以立即拿着碗吃饭。
懒汉方式就是吃完饭,把碗放下,下次吃的时候再洗碗,这就是懒汉。
饿汉方式实现单例模式 template <typename T> class Singleton {
static T data;
public :
static T* GetInstance () { return &data; }
};
该类里面有一个静态成员,静态成员被编译在全局区,程序被编译时就已经有了虚拟地址,在逻辑上可以认为已经有了对象。
懒汉实现方式单例模式 template <typename T> class Singleton {
static T* inst;
public :
static T* GetInstance () {
if (inst == NULL ) {
inst = new T ();
}
return inst;
}
};
这个类里面有一个静态指针,在编译时也有虚拟地址,但是是静态指针的地址,并不是成员的虚拟地址,而是当调用 GetInstance 函数时,才会动态开辟空间。这就是懒汉模式。
现在,我们就来将刚才的线程池进行改造,设计出一个单例模式。
这个就是单例线程池 。但是,这个安全吗?是不安全的,如果是多个线程调用 GetInstance,就有可能创建出多份对象,这是不安全的。
所以,我们应该加一把锁,保证多线程调用时也是安全的。
三、线程安全和重入问题 线程安全: 多个线程在访问共享资源时,能够正确的执行,不会相互干扰或破坏彼此的执行结果。
重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其它的执行流再次进入,称之为重入。
不保护共享变量的函数
函数状态随着被调用,状态发生变化的函数
返回指向静态变量指针的函数
调用线程不安全函数的函数
每个线程对全局变量和静态变量只有读取权限,而没有写入权限,一般来说这些线程是安全的。
类或者接口对于线程来说都是原子性操作。
多个线程之间的切换不会导致该接口的执行结果存在二义性。
调用了 malloc,free 函数,因为 malloc 函数底层是用全局链表来管理堆的。
调用标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构。
可重入函数体内使用了静态的数据结构。
不使用全局变量和静态变量
不使用 malloc 和 new 开辟出来的空间
不调用不可重入函数
不返回静态数据和全局数据,所有数据都由函数的调用者提供
使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全的联系: 函数是可重入的,那就是线程安全的。
可重入与线程安全的区别: 线程安全不一定是可重入的,而可重入函数一定是线程安全的。
比如说,主线程(只有一个线程)执行了信号捕捉方法,可因为信号的到来,也执行了信号捕捉方法,如果这个方法内部是加了锁的,那么当第一次进入函数内部申请锁之后,又因为信号的到来再一次申请锁,那不就把主线程挂载起来了吗!本来主线程就没有释放锁,又再一次申请锁,自己把自己挂载起来,不就出问题了,这不就造成死锁问题了吗!
四、常见锁概念
1. 死锁 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其它进程所占用不会释放的资源而处于一种永久等待的状态。
比如,现在有两个线程 A,B,任何一个线程要访问临界资源,都必须同时申请到两把锁。线程 A 申请到了锁 1,线程 B 申请到了锁 2,现在线程 A 要申请锁 2,线程 B 要申请锁 1,这时候就会出现死锁。因为线程 A,线程 B 分别申请了锁 1 和锁 2,但是没有释放自己的锁,所以再一次申请对方的锁就会被阻塞,如果线程 A,线程 B 不释放自己的锁,那么就会一直被阻塞,循环等待。
2. 形成死锁的四个必要条件
互斥条件: 一个资源每次只能被一个执行流使用。
请求与保持条件: 一个执行流因请求资源而被阻塞时,对已获得的资源保持不放。
比如:线程 A 申请到了锁 1,还想申请锁 2,线程 B 将锁 2 申请走,还想申请锁 1。
不剥夺条件: 一个执行流已获得的资源,在未使用完之前,不能被强行剥夺。
比如:线程 A 申请到了锁,访问临界资源,线程 B 此刻申请锁就会被阻塞,不会申请成功,OS 不能直接把锁给线程 B。
循环等待条件: 若干执行流之间形成一种头尾相接的循环等待资源的关系。
3. 如何避免死锁 那么,形成死锁的条件我们已经知道了,该如何避免死锁呢?
只要破坏形成死锁的四个必要条件中的任何一个条件即可。
破坏循环等待条件问题: 资源一次性分配,使用超时机制,加锁顺序一致。
避免锁未释放的场景: 实际开发中,通常采用'固定加锁顺序'来破坏循环等待条件,并结合 RAII 机制确保锁在异常情况下也能被正确释放。