Linux的线程池
基于前文Linux的多线程-ZEEKLOG博客。
基于前文Linux的生产者消费者模型-ZEEKLOG博客
目录
1、日志
1.1 日志的概念
- 日志是计算机系统、应用程序或服务在运行过程中生成的、按时间顺序记录的事件流文件或数据集合,主要作用是监控运行状态、记录异常信息,帮助快速定位问题并支持程序员进行问题修复。它是系统维护、故障排查和安全管理的重要工具。
1.2 日志的设计
- 要设计出下面的格式:
[可读性很好的时间] [日志等级] [进程PID] [打印对应日志的源文件名] [行号] - 消息内容(支持可 变参数,即多条不同的信息) [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [20] - hello world [2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [21] - hello world [2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world- 日志等级分为:DEBUG,INFO,WARNING,ERROR,FATAL。
- 采用策略模式,通过多态的方式,可以随时选择打印到显示器,还是文件。
- 重载 << 支持 C++ 风格的日志输入,使用模版,表示支持任意类型。
logger(LogLevel::DEBUG, "main.cc", 10) << "hello world," << 3.14 << " " << 8899 << "aaaa";1.3 代码
1.3.1 Log.hpp
#pragma once #include <iostream> #include <string> #include <time.h> #include <cstdio> #include "Mutex.hpp" #include <filesystem> // C++17,方便文件管理 #include <fstream> // 方便文件内容的操作 #include <sstream> #include <memory> #include <sys/types.h> #include <unistd.h> namespace LogModule { using namespace MutexModule; std::string GetTime() { time_t curr = time(nullptr); struct tm curr_tm; localtime_r(&curr, &curr_tm); char time_str[128]; snprintf(time_str, sizeof(time_str), "%4d-%02d-%02d %02d:%02d:%02d", curr_tm.tm_year + 1900, curr_tm.tm_mon + 1, curr_tm.tm_mday, curr_tm.tm_hour, curr_tm.tm_min, curr_tm.tm_sec); return time_str; } enum class LogLevel { DEBUG, INFO, WARNING, ERROR, FATAL }; std::string LogLeveltoString(LogLevel log_level) { if (log_level == LogLevel::DEBUG) return "DEBUG"; else if (log_level == LogLevel::INFO) return "INFO"; else if (log_level == LogLevel::WARNING) return "WARNING"; else if (log_level == LogLevel::ERROR) return "ERROR"; else if (log_level == LogLevel::FATAL) return "FATAL"; else return "UNKNOWN"; } class LogStrategy { public: // 纯虚函数,不需要实现,不能创建对象 virtual void SyncLog(const std::string &message) = 0; }; class ConsoleLogStrategy : public LogStrategy { public: virtual void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::cout << message << std::endl; } private: Mutex _mutex; }; const std::string default_path = "./log/"; const std::string default_name = "my.log"; const std::string gsep = "\r\n"; // 全局的分隔符 class FileLogStrategy : public LogStrategy { public: FileLogStrategy(const std::string &file_path = default_path, const std::string &file_name = default_name) : _file_path(file_path), _file_name(file_name) { LockGuard lockguard(_mutex); if (std::filesystem::exists(_file_path)) { return; } try { std::filesystem::create_directories(_file_path); } catch (const std::filesystem::filesystem_error &e) { std::cerr << e.what() << '\n'; } } virtual void SyncLog(const std::string &message) override { LockGuard lockguard(_mutex); std::string file_path_name = _file_path + (_file_path.back() == '/' ? "" : "/") + _file_name; // 以追加写入的方式打开 std::ofstream out(file_path_name, std::ios::app); if (!out.is_open()) { return; } out << message << gsep; } public: std::string _file_path; std::string _file_name; Mutex _mutex; }; class Logger { public: Logger() { EnableConsoleLogStrategy(); } void EnableConsoleLogStrategy() { _fflush_strategy = std::make_unique<ConsoleLogStrategy>(); } void EnableFileLogStrategy() { _fflush_strategy = std::make_unique<FileLogStrategy>(); } class LogMessage { public: LogMessage(LogLevel log_level, const std::string &file_name, int line_number, const Logger *logger) : _logger(logger) { std::stringstream ss; ss << "[" << GetTime() << "] " << "[" << LogLeveltoString(log_level) << "] " << "[" << getpid() << "] " << "[" << file_name << "] " << "[" << line_number << "] - "; _message = ss.str(); } template <typename T> LogMessage& operator<<(const T &info) { std::stringstream ss; ss << info; _message += ss.str(); return *this; } ~LogMessage() { if (_logger->_fflush_strategy) { _logger->_fflush_strategy->SyncLog(_message); } } private: std::string _message; const Logger *_logger; }; LogMessage operator()(LogLevel log_level, const std::string &file_name, int line_number) { return LogMessage(log_level, file_name, line_number, this); } private: std::unique_ptr<LogStrategy> _fflush_strategy = nullptr; }; // 全局日志对象 Logger logger; // 使用宏,简化用户操作,获取文件名和行号 #define LOG(level) logger(level, __FILE__, __LINE__) #define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy() #define Enable_File_Log_Strategy() logger.EnableFileLogStrategy() }1.3.2 Mutex.hpp
#pragma once #include <pthread.h> namespace MutexModule { class Mutex { public: Mutex() { pthread_mutex_init(&_mutex, nullptr); } void Lock() { pthread_mutex_lock(&_mutex); } void Unlock() { pthread_mutex_unlock(&_mutex); } pthread_mutex_t *Get() { return &_mutex; } ~Mutex() { pthread_mutex_destroy(&_mutex); } private: pthread_mutex_t _mutex; }; // RAII,资源的初始化与释放与对象的生命周期绑定 class LockGuard { public: LockGuard(Mutex &mutex) : _mutex(mutex) { _mutex.Lock(); } ~LockGuard() { _mutex.Unlock(); } private: Mutex &_mutex; }; }1.4 注意事项
日志等级+日志写入的换行符:为什么使用enum class,不用enum?因为enum class,不能隐式转换为整数(要转成整数,必须显示转化),也不能与其他枚举类型的成员直接比较,提高了类型安全性;enum class的成员必须通过枚举名访问(如LogLevel::DEBUG),避免了命名污染。为什么是/r/n?/r是回车(回到这一行的开头),/n是换行(换到下一行),有的操作系统,/n就有回车+换行的作用,有些操作系统不是。
一条日志信息的对象+多次<<:LogMessage,一条日志信息,RAII思想(利用类的构造(初始化)和析构(收尾))的日志格式化和刷新,作为内部类(独立于外部类,只是受外部类的访问限定符和外部类类域限制),外部类传this指针给内部类,内部类能访问外部类的私有和保护成员(内部类是外部类的"友元"),访问外部类的策略,而反过来,内部类传this指针给外部类,外部类无法访问内部类的私有或保护成员。Logger中的LogMessage operator()(),return LogMessage临时对象,而LogMessage中的LogMessage& operator<<(),也return *this,返回当前LogMessage的引用,所以,一个LogMessage临时对象可以多次<<,如:logger(LogLevel::DEBUG, "main.cc", 10) << "hello world," << 3.14 << " " << 8899 << "aaaa";。
2、线程池
2.1 线程池的概念
- 线程池是一种多线程处理模式,其核心思想是通过对线程资源的统一管理和复用,来优化程序性能,避免因线程过多而引发的各种问题。
2.2 线程池的设计
- 线程池分固定线程数量和浮动线程数量,本文选择固定线程个数的线程池。
- 采用单例模式,某些类,只应该具有⼀个对象(实例),就称之为单例。在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中,此时往往要用一个单例的类来管理这些数据。
- 使用懒汉方式,核心是"延时加载",从而优化服务器的启动速度。

2.3 代码
2.3.1 Main.cc
#include <functional> #include "Log.hpp" #include "ThreadPool.hpp" using namespace LogModule; using namespace ThreadPoolModule; void Download() { std::cout << "下载一个任务" << std::endl; sleep(3); // 假设任务的耗时。 } using task_t = std::function<void()>; int main() { Enable_Console_Log_Strategy(); int count = 10; while (count) { sleep(1); ThreadPool<task_t>::GetInstance()->Enqueue(Download); count--; } ThreadPool<task_t>::GetInstance()->Stop(); ThreadPool<task_t>::GetInstance()->Join(); return 0; }2.3.2 ThreadPool.hpp
#pragma once #include <vector> #include <queue> #include <thread> #include <memory> #include <atomic> #include "Log.hpp" #include "Cond.hpp" #include "Mutex.hpp" namespace ThreadPoolModule { using namespace MutexModule; using namespace CondModule; using namespace LogModule; const int gnum = 5; // 线程个数 template <typename T> class ThreadPool { private: ThreadPool(int num = gnum) : _num(num), _sleep_num(0), _running(true) { for (int i = 0; i < _num; ++i) { // 隐式使用了this,在成员函数内部默认是this->HandlerTask() _threads.emplace_back([this]() { HandlerTask(); }); } } ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(const ThreadPool &) = delete; public: static ThreadPool *GetInstance() { if (_inc == nullptr) { LockGuard lockguard(_inc_mutex); if (_inc == nullptr) { LOG(LogLevel::DEBUG) << "首次使用单例, 创建之...."; // _inc = std::make_unique<ThreadPool>(); // std::make_unique 是一个外部函数,无法访问 ThreadPool 类的私有构造函数 _inc.reset(new ThreadPool()); } } LOG(LogLevel::DEBUG) << "获取单例"; return _inc.get(); // 返回原始指针,不转移所有权 } void HandlerTask() { while (true) { T task; { LockGuard lockguard(_tasks_mutex); while (_tasks.empty() && _running) { ++_sleep_num; _tasks_cond.Wait(_tasks_mutex); --_sleep_num; } if (_tasks.empty() && !_running) break; task = _tasks.front(); _tasks.pop(); } task(); } } void Enqueue(const T &task) { if (_running) { LockGuard lockguard(_tasks_mutex); if (_running) { _tasks.push(task); if (_sleep_num == _num) { LOG(LogLevel::INFO) << "唤醒一个休眠线程"; _tasks_cond.Signal(); } } } } void Stop() { if (!_running) return; _running = false; if (_sleep_num) { LOG(LogLevel::INFO) << "唤醒所有休眠线程"; _tasks_cond.Broadcast(); } } void Join() { LOG(LogLevel::INFO) << "join所有线程"; for (auto &thread : _threads) { if (thread.joinable()) thread.join(); } } private: std::vector<std::thread> _threads; int _num; // 线程个数 int _sleep_num; // 线程阻塞个数 std::queue<T> _tasks; Mutex _tasks_mutex; Cond _tasks_cond; static std::unique_ptr<ThreadPool> _inc; std::atomic<bool> _running; // 线程池是否启动。 static Mutex _inc_mutex; }; template <typename T> std::unique_ptr<ThreadPool<T>> ThreadPool<T>::_inc = nullptr; template <typename T> Mutex ThreadPool<T>::_inc_mutex; }2.3.3 Cond.hpp
#pragma once #include <pthread.h> #include "Mutex.hpp" namespace CondModule { class Cond { public: // RAII,资源的初始化与释放与对象的生命周期绑定 Cond() { pthread_cond_init(&_cond,nullptr); } void Wait(MutexModule::Mutex& mutex) { pthread_cond_wait(&_cond,mutex.Get()); } void Signal() { pthread_cond_signal(&_cond); } void Broadcast() { pthread_cond_broadcast(&_cond); } ~Cond() { pthread_cond_destroy(&_cond); } private: pthread_cond_t _cond; }; }2.4 注意事项
线程池的固定线程个数:const static int gnum = 5;?这是一个在头文件(.hpp)中定义的全局常量。static 在这里表示内部链接,即该变量仅在当前编译单元(包含此头文件的 .cpp 文件)中可见。若不使用 static,当多个 .cpp 文件包含此头文件时,gnum 会被多次定义,导致链接错误(重定义)。static 确保每个包含该头文件的编译单元都有一份独立的 gnum 实例,避免冲突。但是在命名空间里面直接const int gnum = 5;,也可以,比较推荐。
单例模式+懒汉模式:构造函数私有,还要delete拷贝构造和赋值重载。第一个对象(仅有的一个)怎么来的?现在外部创建不了(构造函数私有),那么就要在没有对象的时候,在类内部创建,嗯?static?调用GetInstance()函数的时候再创建,不就是懒汉模式,延迟加载吗?所以是static(静态成员函数,不依赖于类的对象)。为什么 _inc 和 _mutex 必须被static 修饰?因为静态成员函数GetInstance() 只能访问静态成员。为什么类内部不用一个static标记,标记类的对象个数,使只有一个?可以,但是可能会更复杂。我其实也可以在类内部创建多个ThreadPool对象吧,单例模式就是自己规定只有一个?是的,通过代码约束。线程池需要加锁保证线程安全。双层判断_inc,当不为nullptr时,就不用获取锁再判断了。GetInstance() 不能返回unique_ptr,因为不能拷贝,那么返回原始指针,是否有问题?没有,GetInstance() 返回 _inc.get()(原始指针),仅提供 “访问权” 而非 “所有权”,对象的生命周期仍由 unique_ptr 负责。
线程的处理:线程获取任务后,任务已经是线程私有的了,线程处理任务不在临界区,支持并发。如果在临界区,就要等任务处理完,才轮到下一个线程。规定:队列为空 && _running == true,线程需要等待;队列为空 && _running == false,线程退出。std::atomic<>是保证单个变量的读写操作原子化、可见性,仅针对单个变量。mutex是保护一段代码(临界区)的互斥执行,针对一段逻辑。任务队列需要加锁保证线程安全。双层判断_running,当为false时,就不用获取锁再判断了。
线程池的线程的回收操作:规定:stop()后,不允许再push数据,使线程退出。因为线程一直在运行,需要将线程池的运行状态置为false;,再唤醒所有线程对象,判断为false,使所有线程退出,才能join()成功。如果底层的线程执行完了函数,"线程对象"还是活跃的吗?还要join吗?底层的线程结束了,资源回收了,但是"线程对象"仍然“关联”着那个已经结束的线程,还需要同步和清理。所以,无论底层线程是正在运行,还是已经结束,在 std::thread 对象析构之前,必须确保 joinable()从 true -> false(默认构造的(没有关联线程)或 被 move(线程所有权已转移)或 调用 join() 或 调用 detach())。如果joinable() == false,就不能再move()或join()或detach()。