跳到主要内容 Linux 线程池封装与实现详解 | 极客日志
C++ 算法
Linux 线程池封装与实现详解 本文介绍了在 Linux 环境下使用 pthread 库实现线程池的完整过程。内容包括线程类的封装、互斥锁与条件变量的封装(LockGuard)、日志系统的实现、任务类的定义以及两种线程池实现方案:普通构造函数初始化和懒汉模式单例初始化。通过具体的 C++ 代码示例展示了如何管理线程生命周期、任务队列同步及资源清理,适用于理解多线程并发编程的基础架构。
一、线程的封装
这里模仿 C++ 中的线程,将 Linux 中的线程封装了一下,具体实现请看下面的代码。
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
using namespace std;
template <class T >
using func_t = function<void (T&)>;
template <class T >
class Thread {
public :
Thread (const string &threadname, func_t <T> func, const T& data)
: _pid(0 ), _threadname(threadname), _func(func), isrunning (false ), _data(data) {}
static void * ThreadRoutine (void * arg) {
Thread *pt = (Thread *)arg;
pt->_func(pt->_data);
return nullptr ;
}
bool Start () {
n = (&_pid, , ThreadRoutine, );
(n == ) {
isrunning = ;
;
} {
;
}
}
{
(!isrunning) ;
(_pid, );
}
{ isrunning; }
{ _threadname; }
~ () {}
:
_pid;
string _threadname;
isrunning;
<T> _func;
T _data;
};
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
int
pthread_create
nullptr
this
if
0
true
return
true
else
return
false
bool Join ()
if
return
false
return
pthread_join
nullptr
bool IsRunning ()
return
string ThreadName ()
return
Thread
private
pthread_t
bool
func_t
二、锁的封装 为了方便后面锁的申请和释放,这里将锁进行了简单的封装,创建对象时,就申请锁,当出了作用域以后,析构对象时就将锁释放,也可以减少死锁出现的情况。
#pragma once
#include <iostream>
class Mutex {
public :
Mutex (pthread_mutex_t * lock) : pmutex (lock) {}
void Lock () { pthread_mutex_lock (pmutex); }
void Unlock () { pthread_mutex_unlock (pmutex); }
~Mutex () {}
public :
pthread_mutex_t * pmutex;
};
class LockGuard {
public :
LockGuard (pthread_mutex_t * lock) : mutex (lock) { mutex.Lock (); }
~LockGuard () { mutex.Unlock (); }
public :
Mutex mutex;
};
三、日志的实现 为了后面输出信息更加方便编写和阅读,这里简单的实现了一个日志的功能,它可以在线程要输出内容的前面加上日志等级、输出时间、线程 ID 等信息,信息可以根据选择输出到显示屏上、一个文件中和根据日志等级输出到多个文件中,具体实现请看下面的代码。
#pragma once
#include "LockGuard.hpp"
#include <iostream>
#include <string>
#include <stdarg.h>
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
using namespace std;
enum {
Debug = 0 ,
Info,
Warning,
Error,
Fatal
};
enum {
Screen = 10 ,
OneFile,
ClassFile
};
string LevelToString (int level) {
switch (level) {
case Debug: return "Debug" ;
case Info: return "Info" ;
case Warning: return "Warning" ;
case Error: return "Error" ;
case Fatal: return "Fatal" ;
default : return "Unknow" ;
}
}
const char * default_filename = "log." ;
const int default_style = Screen;
const char * defaultdir = "log" ;
class Log {
public :
Log () : style (default_style), filename (default_filename) {
pthread_mutex_init (&_log_mutex, nullptr );
}
void SwitchStyle (int sty) { style = sty; }
void WriteLogToOneFile (const string &logname, const string &logmessage) {
int fd = open (logname.c_str (), O_CREAT | O_WRONLY | O_APPEND, 0666 );
if (fd == -1 ) return ;
{
LockGuard lockguard (&_log_mutex) ;
write (fd, logmessage.c_str (), logmessage.size ());
}
close (fd);
}
void WriteLogToClassFile (const string &levelstr, const string &logmessage) {
mkdir (defaultdir, 0775 );
string name = defaultdir;
name += "/" ;
name += filename;
name += levelstr;
WriteLogToOneFile (name, logmessage);
}
void WriteLog (int level, const string &logmessage) {
switch (style) {
case Screen: {
LockGuard lockguard (&_log_mutex) ;
cout << logmessage;
}
break ;
case OneFile:
WriteLogToClassFile ("All" , logmessage);
break ;
case ClassFile:
WriteLogToClassFile (LevelToString (level), logmessage);
break ;
default :
break ;
}
}
string GetTime () {
time_t CurrentTime = time (nullptr );
struct tm * curtime = localtime (&CurrentTime);
char time[128 ];
snprintf (time, sizeof (time), "%d-%d-%d %d:%d:%d" , curtime->tm_year + 1900 , curtime->tm_mon + 1 , curtime->tm_mday, curtime->tm_hour, curtime->tm_min, curtime->tm_sec);
return time;
}
void LogMessage (int level, const char * format, ...) {
char left[1024 ];
string Levelstr = LevelToString (level).c_str ();
string Timestr = GetTime ().c_str ();
string Idstr = to_string (getpid ());
snprintf (left, sizeof (left), "[%s][%s][%s] " , Levelstr.c_str (), Timestr.c_str (), Idstr.c_str ());
va_list args;
va_start (args, format);
char right[1024 ];
vsnprintf (right, sizeof (right), format, args);
string logmessage = left;
logmessage += right;
WriteLog (level, logmessage);
va_end (args);
}
~Log () { pthread_mutex_destroy (&_log_mutex); };
private :
int style;
string filename;
pthread_mutex_t _log_mutex;
};
Log log;
class Conf {
public :
Conf () { log.SwitchStyle (Screen); }
~Conf () {}
};
Conf conf;
四、任务的实现 可能大家没有学习过网络,这里就设计一个简单的运算类,作为线程需要传递和执行的任务,具体实现请看下面的代码。
#pragma once
#include <string>
using namespace std;
const int defaultresult = 0 ;
enum {
ok = 0 ,
div_zero,
mod_zero,
unknow
};
string ops = "+-*/%" ;
class Task {
public :
Task () {}
Task (int x, int y, char op) : data_x (x), data_y (y), _op(op), result (defaultresult), code (ok) {}
void Run () {
switch (_op) {
case '+' : result = data_x + data_y; break ;
case '-' : result = data_x - data_y; break ;
case '*' : result = data_x * data_y; break ;
case '/' : {
if (data_y == 0 ) code = div_zero;
else result = data_x / data_y;
}
break ;
case '%' : {
if (data_y == 0 ) code = mod_zero;
else result = data_x % data_y;
}
break ;
default : code = unknow; break ;
}
}
void operator () () { Run (); }
string PrintTask () {
string s;
s += to_string (data_x);
s += _op;
s += to_string (data_y);
s += " = ?" ;
return s;
}
string PrintResult () {
string s;
s += to_string (data_x);
s += _op;
s += to_string (data_y);
s += " = " ;
s += to_string (result);
s += " [" ;
s += to_string (code);
s += "]" ;
return s;
}
~Task () {}
private :
int data_x;
int data_y;
char _op;
int result;
int code;
};
五、线程池的实现
5.1 线程池的实现及测试
#pragma once
#include <string>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
using namespace std;
const int default_threadnum = 3 ;
class ThreadDate {
public :
ThreadDate (const string &name) : threadname (name) {}
~ThreadDate () {}
public :
string threadname;
};
template <class T >
class ThreadPool {
public :
ThreadPool (int num = default_threadnum) : _threadnum(num) {
pthread_mutex_init (&_mutex, nullptr );
pthread_cond_init (&_cond, nullptr );
for (int i = 0 ; i < _threadnum; i++) {
string threadname = "thread-" + to_string (i + 1 );
ThreadDate td (threadname) ;
_threads.push_back (Thread <ThreadDate>(threadname, bind (&ThreadPool<T>::ThreadRun, this , placeholders::_1), td));
log.LogMessage (Info, "%s is create\n" , threadname.c_str ());
}
}
bool Start () {
for (auto &t : _threads) {
t.Start ();
log.LogMessage (Info, "%s , is running...\n" , t.ThreadName ().c_str ());
}
return true ;
}
void Join () {
for (auto &t : _threads) {
t.Join ();
}
}
void Thread_Wait (const ThreadDate &td) {
pthread_cond_wait (&_cond, &_mutex);
log.LogMessage (Debug, "no task , %s is sleeping...\n" , td.threadname.c_str ());
}
void Thread_Wakeup () {
pthread_cond_signal (&_cond);
}
bool Push (T &in) {
LockGuard lockguard (&_mutex) ;
_q.push (in);
Thread_Wakeup ();
log.LogMessage (Debug, "other thread push a task , task is %s\n" , in.PrintTask ().c_str ());
return true ;
}
void ThreadRun (const ThreadDate &td) {
while (1 ) {
T t;
{
LockGuard lockguard (&_mutex) ;
while (_q.empty ()) {
Thread_Wait (td);
log.LogMessage (Debug, "haven task , %s is wakeup\n" , td.threadname.c_str ());
}
t = _q.front ();
_q.pop ();
}
t.Run ();
log.LogMessage (Debug, "%s handler task %s done , result is %s\n" , td.threadname.c_str (), t.PrintTask ().c_str (), t.PrintResult ().c_str ());
}
}
~ThreadPool () {
pthread_mutex_destroy (&_mutex);
pthread_cond_destroy (&_cond);
}
private :
queue<T> _q;
vector<Thread<ThreadDate>> _threads;
int _threadnum;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};
#include <iostream>
#include <memory>
#include <time.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
using namespace std;
int main () {
unique_ptr<ThreadPool<Task>> tp (new ThreadPool <Task>());
tp->Start ();
srand ((uint64_t )time (nullptr ) ^ getpid ());
while (1 ) {
int data1 = rand () % 100 ;
usleep (10000 );
int data2 = rand () % 100 ;
usleep (10000 );
char op = ops[rand () % ops.size ()];
Task t (data1, data2, op) ;
tp->Push (t);
sleep (1 );
}
tp->Join ();
return 0 ;
}
5.2 使用懒汉模式实现线程池及测试 懒汉模式实现了延迟初始化,即只有在第一次需要使用实例对象时才进行创建。这种方式可以避免在类加载时就创建实例对象,从而节省系统资源。
#pragma once
#include <string>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Thread.hpp"
#include "Log.hpp"
#include "LockGuard.hpp"
using namespace std;
const int default_threadnum = 3 ;
class ThreadDate {
public :
ThreadDate (const string &name) : threadname (name) {}
~ThreadDate () {}
public :
string threadname;
};
template <class T >
class ThreadPool {
private :
ThreadPool (int num = default_threadnum) : _threadnum(num) {
pthread_mutex_init (&_mutex, nullptr );
pthread_cond_init (&_cond, nullptr );
for (int i = 0 ; i < _threadnum; i++) {
string threadname = "thread-" + to_string (i + 1 );
ThreadDate td (threadname) ;
_threads.push_back (Thread <ThreadDate>(threadname, bind (&ThreadPool<T>::ThreadRun, this , placeholders::_1), td));
log.LogMessage (Info, "%s is create\n" , threadname.c_str ());
}
}
ThreadPool (const ThreadPool<T>& tp) = delete ;
ThreadPool<T>& operator =(const ThreadPool<T>& tp) = delete ;
public :
static ThreadPool<T>* GetInstance (int num = default_threadnum) {
if (instance == nullptr ) {
LockGuard lockguard (&_instance_mutex) ;
if (instance == nullptr ) {
instance = new ThreadPool <T>();
}
}
return instance;
}
bool Start () {
for (auto &t : _threads) {
t.Start ();
log.LogMessage (Info, "%s , is running...\n" , t.ThreadName ().c_str ());
}
return true ;
}
void Join () {
for (auto &t : _threads) {
t.Join ();
}
}
void Thread_Wait (const ThreadDate &td) {
pthread_cond_wait (&_cond, &_mutex);
log.LogMessage (Debug, "no task , %s is sleeping...\n" , td.threadname.c_str ());
}
void Thread_Wakeup () {
pthread_cond_signal (&_cond);
}
bool Push (T &in) {
LockGuard lockguard (&_mutex) ;
_q.push (in);
Thread_Wakeup ();
log.LogMessage (Debug, "other thread push a task , task is %s\n" , in.PrintTask ().c_str ());
return true ;
}
void ThreadRun (const ThreadDate &td) {
while (1 ) {
T t;
{
LockGuard lockguard (&_mutex) ;
while (_q.empty ()) {
Thread_Wait (td);
log.LogMessage (Debug, "haven task , %s is wakeup\n" , td.threadname.c_str ());
}
t = _q.front ();
_q.pop ();
}
t.Run ();
log.LogMessage (Debug, "%s handler task %s done , result is %s\n" , td.threadname.c_str (), t.PrintTask ().c_str (), t.PrintResult ().c_str ());
}
}
~ThreadPool () {
pthread_mutex_destroy (&_mutex);
pthread_cond_destroy (&_cond);
}
private :
queue<T> _q;
vector<Thread<ThreadDate>> _threads;
int _threadnum;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T>* instance;
static pthread_mutex_t _instance_mutex;
};
template <class T > ThreadPool<T>* ThreadPool<T>::instance = nullptr ;
template <class T > pthread_mutex_t ThreadPool<T>::_instance_mutex = PTHREAD_MUTEX_INITIALIZER;
#include <iostream>
#include <memory>
#include <time.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
using namespace std;
int main () {
ThreadPool<Task>::GetInstance ()->Start ();
srand ((uint64_t )time (nullptr ) ^ getpid ());
while (1 ) {
usleep (10000 );
int data2 = rand () % 100 ;
usleep (10000 );
char op = ops[rand () % ops.size ()];
Task t (data1, data2, op) ;
ThreadPool<Task>::GetInstance ()->Push (t);
sleep (1 );
}
ThreadPool<Task>::GetInstance ()->Join ();
return 0 ;
}