Linux之线程池

Linux之线程池

一、线程池

1. 线程池的认识

线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程时的代价。线程池不仅可以保证内核的充分利用,还能防止过分调度

线程池的应用场景:

需要大量的线程完成任务,且完成任务的时间比较短。比如WEB服务器完成网页请求这样的任务。

对性能要求苛刻的应用,比如:要求服务器迅速响应客户请求。

2. 线程池的实现

ThreadPool.hpp

#pragmaonce#include<iostream>#include<vector>#include<queue>#include<string>#include<memory>#include<time.h>#include<unistd.h>#include"Mutex.hpp"#include"Cond.hpp"#include"Thread.hpp"#include"Logger.hpp"#include"Task.hpp"constint default_thread_num =5; template <typename T> class ThreadPool { private: bool QueueIsEmpty(){return _q.empty();}voidRoutine(const std::string &name){while(true){ T t;{// 把公共资源获取到线程私有 LockGuard lockguard(&_lock);while(QueueIsEmpty()&& _is_running){// 队列为空,线程需要去等待 _wait_thread_num++; _cond.Wait(_lock); _wait_thread_num--;}if(!_is_running &&QueueIsEmpty()){LOG(LoggerLevel::INFO)<<" 线程池退出 && 任务队列为空, "<< name <<" 退出";break;}// 一定有任务// 获取任务,消耗历史任务 t = _q.front(); _q.pop();}t();// 执行任务,不需要在临界区执行,临界区保护的是类内的资源LOG(LoggerLevel::DEBUG)<< name <<" handler task, "<< t.ResultToString();}} public:ThreadPool(int thread_num = default_thread_num):_thread_num(thread_num),_is_running(false),_wait_thread_num(0){for(int i =0; i < _thread_num;++i){ std::string name ="thread-"+ std::to_string(i +1); _threads.emplace_back([this](const std::string &_name){ this->Routine(_name);}, name);// Thread t([this]() {// this->hello();// }, name);// _threads.push_back(std::move(t));}LOG(LoggerLevel::INFO)<<"ThreadPool obj create success";}voidStart(){if(_is_running){return;} _is_running = true;for(auto&t : _threads){ t.Start();}}// 核心思想:让线程走正常的唤醒逻辑退出// 线程池要退出// 1.如果被唤醒 && 任务队列里没有任务 = 线程退出// 2.如果被唤醒 && 任务队列里有任务 = 线程不能立即退出,应该让线程把任务处理完,在退出// 3.如果线程本身没有被休眠,我们应该让它把能处理的任务全部处理完成,在退出// 3 || 2 --> 1voidStop(){if(!_is_running)return; _is_running = false;if(_wait_thread_num >0) _cond.NotifyAll();// if (!_is_running)// return;// _is_running = false;// for (auto &t : _threads)// {// t.Stop();// }LOG(LoggerLevel::INFO)<<"thread pool stop success";}voidWait(){for(auto&t : _threads){ t.Join();}LOG(LoggerLevel::INFO)<<"thread pool wait success";}voidEnQueue(const T &t){//线程池退出,不要让用户继续添加任务if(!_is_running)return;{ LockGuard lockguard(&_lock); _q.push(t);if(_wait_thread_num >0) _cond.NotifyOne();}}~ThreadPool(){} private: std::queue<T> _q; std::vector<Thread> _threads;int _thread_num; Mutex _lock; Cond _cond; bool _is_running;int _wait_thread_num;};

Task.hpp

#pragmaonce#include<iostream>#include<sstream>#include<functional>#include<unistd.h> class Task { public:Task(){}Task(int x,int y):a(x),b(y){}voidExcute(){ result = a + b;}voidoperator()(){// sleep(1);Excute();} std::string ResultToString(){ std::stringstream ss; ss << a <<" + "<< b <<" = "<< result;return ss.str();}// void Print()// {// std::cout << a << " + " << b << " = " << result << std::endl;// } private:int a;int b;int result;};// using func_t = std::function<void()>;// void PrintLog()// {// std::cout << "hello 我是一个日志任务" << std::endl;// }

Mutex.hpp

#pragmaonce#include<iostream>#include<mutex>#include<pthread.h> class Mutex { public:Mutex(){pthread_mutex_init(&_lock, nullptr);}voidLock(){pthread_mutex_lock(&_lock);}pthread_mutex_t*Get(){return&_lock;}voidUnlock(){pthread_mutex_unlock(&_lock);}~Mutex(){pthread_mutex_destroy(&_lock);} private:pthread_mutex_t _lock;}; class LockGuard { public:LockGuard(Mutex* _mutex):_mutexp(_mutex){ _mutexp->Lock();}~LockGuard(){ _mutexp->Unlock();} private: Mutex* _mutexp;};

Cond.hpp

#include"Mutex.hpp" class Cond { public:Cond(){pthread_cond_init(&_cond, nullptr);}voidWait(Mutex& lock){pthread_cond_wait(&_cond, lock.Get());}voidNotifyOne(){pthread_cond_signal(&_cond);}voidNotifyAll(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);} private:pthread_cond_t _cond;};

Thread.hpp

#ifndef__THREAD_HPP__#define__THREAD_HPP__#include<iostream>#include<vector>#include<string>#include<functional>#include<pthread.h>#include<unistd.h>#include<sys/syscall.h>#include"Logger.hpp"#defineget_lwp_id()syscall(SYS_gettid) using func_t= std::function<void(const std::string& name)>; std::string thread_name_default ="None_Name"; class Thread { public:Thread(func_t func, std::string name = thread_name_default):_isrunning(false),_name(name),_func(func){}staticvoid*start_routine(void* args){ Thread* self = static_cast<Thread*>(args); self->_isrunning = true; self->_lwpid =get_lwp_id(); self->_func(self->_name);pthread_exit((void*)0);}voidStart(){int n =pthread_create(&_tid, nullptr, start_routine, this);if(n ==0){LOG(LoggerLevel::INFO)<<"pthread_create success";}}voidStop(){int n =pthread_cancel(_tid);LOG(LoggerLevel::INFO)<<"thread cancel success";(void)n;}voidJoin(){if(!_isrunning)return;int n =pthread_join(_tid, nullptr);if(n ==0){LOG(LoggerLevel::INFO)<<"pthread_join success";}}~Thread(){} private: bool _isrunning;pthread_t _tid;pid_t _lwpid; std::string _name;func_t _func;};#endif

Logger.hpp

#pragmaonce#include<iostream>#include<filesystem>#include<fstream>#include<string>#include<sstream>#include<memory>#include<unistd.h>#include"Mutex.hpp"enumclass LoggerLevel { DEBUG, INFO, WARNING, ERROR, FATAL }; std::string LoggerLevelToString(LoggerLevel level){switch(level){case LoggerLevel::DEBUG:return"Debug";case LoggerLevel::INFO:return"Info";case LoggerLevel::WARNING:return"Warning";case LoggerLevel::ERROR:return"Error";case LoggerLevel::FATAL:return"Fatal";default:return"Unknown";}} std::string GetCurrentTime(){// 获取时间戳time_t timep =time(nullptr);// 把时间戳转化为时间格式structtm currtm;localtime_r(&timep,&currtm);// 转化为字符串char buffer[64];snprintf(buffer,sizeof(buffer),"%4d-%02d-%02d %02d-%02d-%02d", currtm.tm_year +1900, currtm.tm_mon +1, currtm.tm_mday, currtm.tm_hour, currtm.tm_min, currtm.tm_sec);return buffer;} class LogStrategy { public: virtual ~LogStrategy()=default; virtual voidSyncLog(const std::string &logmessage)=0;};// 显示器刷新 class ConsoleLogStrategy : public LogStrategy { public:~ConsoleLogStrategy(){} virtual voidSyncLog(const std::string &logmessage) override {{ LockGuard lockguard(&_lock); std::cout << logmessage << std::endl;}} private: Mutex _lock;};const std::string default_dir_path_name ="log";const std::string default_filename ="test.log";// 文件刷新 class FileLogStrategy : public LogStrategy { public:FileLogStrategy(const std::string dir_path_name = default_dir_path_name,const std::string filename = default_filename):_dir_path_name(dir_path_name),_filename(filename){if(std::filesystem::exists(_dir_path_name)){return;} try { std::filesystem::create_directories(_dir_path_name);}catch(const std::filesystem::filesystem_error &e){ std::cerr << e.what()<<"\r\n";}}~FileLogStrategy(){} virtual voidSyncLog(const std::string &logmessage) override {{ LockGuard lock(&_lock); std::string target = _dir_path_name; target +='/'; target += _filename; std::ofstream out(target.c_str(), std::ios::app);if(!out.is_open()){return;} out << logmessage <<"\n"; out.close();}} private: std::string _dir_path_name; std::string _filename; Mutex _lock;}; class Logger { public:Logger(){}voidEnableConsoleStrategy(){ _strategy = std::make_unique<ConsoleLogStrategy>();}voidEnableFileStrategy(){ _strategy = std::make_unique<FileLogStrategy>();} class LogMessage { public:LogMessage(LoggerLevel level, std::string filename,int line, Logger& logger):_curr_time(GetCurrentTime()),_level(level),_pid(getpid()),_filename(filename),_line(line),_logger(logger){ std::stringstream ss; ss <<"["<< _curr_time <<"] "<<"["<<LoggerLevelToString(_level)<<"] "<<"["<< _pid <<"] "<<"["<< _filename <<"] "<<"["<< _line <<"]"<<" - "; _loginfo = ss.str();} template <typename T> LogMessage &operator<<(const T &info){ std::stringstream ss; ss << info; _loginfo += ss.str();return*this;}~LogMessage(){if(_logger._strategy){ _logger._strategy->SyncLog(_loginfo);}} private: std::string _curr_time;// 时间戳 LoggerLevel _level;// 日志等级pid_t _pid;// 进程pid std::string _filename;// 文件名int _line;// 行号 std::string _loginfo;// 一条合并完成的,完整的日志信息 Logger &_logger;// 提供刷新策略的具体做法}; LogMessage operator()(LoggerLevel level, std::string filename,int line){returnLogMessage(level, filename, line,*this);}~Logger(){} private: std::unique_ptr<LogStrategy> _strategy;}; Logger logger;#defineLOG(level)logger(level,__FILE__,__LINE__)#defineEnableConsoleStrategy() logger.EnableConsoleStrategy()#defineEnableFileStrategy() logger.EnableFileStrategy()

main.cc

#include"ThreadPool.hpp"intmain(){srand((unsignedint)time(nullptr));EnableConsoleStrategy(); std::unique_ptr<ThreadPool<Task>> tq = std::make_unique<ThreadPool<Task>>(10); tq->Start();// sleep(5);int cnt =10;while(cnt--){int x =rand()%10+1;int y =rand()%5+1; Task t(x, y); tq->EnQueue(t);sleep(1);} tq->Stop(); tq->Wait();return0;}

Makefile

threadpool:main.cc g++-o $@ $^-std=c++17-lpthread .PHONY:clean clean: rm -f threadpool 

3. 单例模式

什么是单例模式呢

某些类,只能具有一个对象(实例),就称之为单例

在很多的服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中,此时往往需要一个单例的类来管理这些数据

实现单例的两种方式饿汉方式和懒汉方式

所谓饿汉方式就是吃完饭,立刻洗碗,这就是饿汉方式,因为下一顿吃的时候就可以立即拿着碗吃饭。

懒汉方式就是吃完饭,把碗放下,下次吃的时候再洗碗,这就是懒汉。

饿汉方式实现单例模式

template <typename T> class Singleton {static T data; public:static T*GetInstance(){return&data;}};

该类里面有一个静态成员,静态成员被编译在全局区,程序被编译时就已经有了虚拟地址,在逻辑上可以认为已经有了对象

懒汉实现方式单例模式

template <typename T> class Singleton {static T* inst; public:static T*GetInstance(){if(inst ==NULL){ inst = new T();}return inst;}};

这个类里面有一个静态指针,在编译时也有虚拟地址,但是是静态指针的地址,并不是成员的虚拟地址,而是当调用GetInstance函数时,才会动态开辟空间。这就是懒汉模式。

现在,我们就来将刚才的线程池进行改造,设计出一个单例模式。

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

这个就是单例线程池。但是,这个安全吗?是不安全的,如果是多个线程调用 GetInstance,就有可能创建出多份对象,这是不安全的

所以,我们应该加一把锁,保证多线程调用时也是安全的

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

4. 线程安全和重入问题

线程安全多个线程在访问共享资源时,能够正确的执行,不会相互干扰或破坏彼此的执行结果

重入同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其它的执行流再次进入,称之为重入

到了现在,我们能够知道重入分为两种情况

多线程重入函数

信号导致一个执行流重复进入函数

常见的线程不安全的情况

不保护共享变量的函数

函数状态随着被调用,状态发生变化的函数

返回指向静态变量指针的函数

调用线程不安全函数的函数

常见线程安全的情况

每个线程对全局变量和静态变量只有读取权限,而没有写入权限,一般来说这些线程是安全的

类或者接口对于线程来说都是原子性操作

多个线程之间的切换不会导致该接口的执行结果存在二义性

常见不可重入的情况

调用了 malloc,free函数,因为 malloc函数底层是用全局链表来管理堆的

调用标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构

可重入函数体内使用了静态的数据结构

常见可重入的情况

不使用全局变量和静态变量

不使用 malloc 和new开辟出来的空间

不调用不可重入函数

不返回静态数据和全局数据,所有数据都由函数的调用者提供

使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据

可重入与线程安全的联系函数是可重入的,那就是线程安全的

可重入与线程安全的区别线程安全不一定是可重入的,而可重入函数一定是线程安全的

比如说,主线程(只有一个线程)执行了信号捕捉方法,可因为信号的到来,也执行了信号捕捉方法,如果这个方法内部是加了锁的,那么当第一次进入函数内部申请锁之后,又因为信号的到来再一次申请锁,那不就把主线程挂载起来了吗!本来主线程就没有释放锁,又再一次申请锁,自己把自己挂载起来,不就出问题了,这不就造成死锁问题了吗

二、常见锁概念

1. 死锁

什么是死锁呢

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其它进程所占用不会释放的资源而处于一种永久等待的状态

比如,现在有两个线程A,B,任何一个线程要访问临界资源,都必须同时申请到两把锁。线程A申请到了锁1,线程B申请到了锁2,现在线程A要申请锁2,线程B要申请锁1,这时候就会出现死锁。因为线程A,线程B分别申请了锁1和锁2,但是没有释放自己的锁,所以再一次申请对方的锁就会被阻塞,如果线程A,线程B不释放自己的锁,那么就会一直被阻塞,循环等待。

2. 形成死锁的四个必要条件

.互斥条件:一个资源每次只能被一个执行流使用

.请求与保持条件:一个执行流因请求资源而被阻塞时,对已获得的资源保持不放

比如:线程A申请到了锁1,还想申请锁2,线程B将锁2申请走,还想申请锁1。

.不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能被强行剥夺

比如:线程A申请到了锁,访问临界资源,线程B此刻申请锁就会被阻塞,不会申请成功,OS不能直接把锁给线程B。

.循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

形成死锁必须满足这四个全部条件

3. 如何避免死锁

那么,形成死锁的条件我们已经知道了,该如何避免死锁呢?

只要破坏形成死锁的四个必要条件中的任何一个条件即可

.破坏循环等待条件问题:资源一次性分配,使用超时机制,加锁顺序一致

.避免锁未释放的场景

Read more

OpenClaw Ubuntu 24.04.4 安装指南

OpenClaw部署(Ubuntu 24.04.4 ) 概述 系统要求 * Node.js 22+:安装脚本可自动检测并安装(下文补充手动安装方案); * Ubuntu 24.04.4(本文重点),也支持 macOS/Windows(Windows 推荐 WSL2); * pnpm:仅从源码构建时需要。 安装方法 方法一:推荐安装脚本(一键式) 脚本自动完成 Node.js 检测/安装、CLI 全局安装、启动引导向导,是最省心的方式。 标准安装(含引导) curl -fsSL https://openclaw.ai/install.sh | bash 如下图所示: 安装完后进入设置页面如下图所示:

By Ne0inhk

手把手教你部署Komari监控:轻量级服务器探针搭建全记录

前言 最近在整理手头的几台服务器,一直想找个趁手的监控工具。以前用过Zabbix,功能确实强,但配置起来总觉得有点重;哪吒监控也不错,不过有时候就想换个轻量点的试试。前两天逛GitHub的时候发现了Komari这个项目,一眼就被它的简洁风格吸引了。 Komari是一个用Go语言写的自托管监控工具,最大的特点就是轻量——官方说二进制文件本身只有十几兆,跑起来内存占用也很低。它的界面长得有点像现在流行的“探针”风格,可以直观地看到CPU、内存、磁盘、网络流量这些基础指标,还支持多台服务器统一管理。 我觉得它比较适合两类人:一是想自己掌控数据、不想用第三方监控服务的个人开发者,二是需要快速部署、不喜欢折腾复杂配置的小团队。数据都存在自己的服务器里,没有隐私方面的顾虑。 这篇教程没有任何“高大上”的理论,就是把我自己从零开始部署的步骤一步一步记下来。哪怕你之前没怎么用过Linux,只要会敲命令、能连上服务器,跟着做应该也能跑起来。我会尽量把每步在做什么说清楚,而不是简单地让你“复制粘贴”。 一、准备工作 在正式开始之前,需要先确认几样东西准备好了。 1. 准备一台具备公网IP的

By Ne0inhk
【Linux】Linux 系统的目录结构详解

【Linux】Linux 系统的目录结构详解

👋 大家好,欢迎来到我的技术博客! 📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。 🎯 本文将围绕Linux这个话题展开,希望能为你带来一些启发或实用的参考。 🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获! 文章目录 * 【Linux】Linux 系统的目录结构详解 📁 * 1. Linux 目录结构概述 🌐 * 2. 根目录 (/) —— 系统的起点 🚩 * 3. /bin —— 基本命令的家 🛠️ * 4. /sbin —— 系统管理员的工具箱 🛠️ * 5. /lib 和 /lib64 —— 系统库的家园 📦 * 6. /etc —— 系统配置的中枢 📝 * 7. /home —— 用户的私人天地 🏠 * 8. /usr —— 用户程序的宝库 📚 * 9. /var —— 变化的数据中心 🔄 * 10. /dev —— 设备的入口 🖥️ * 11. /proc —— 进程的虚拟文件系统

By Ne0inhk
Ubuntu 环境安装 之 RabbitMQ 快速入手

Ubuntu 环境安装 之 RabbitMQ 快速入手

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~ 🌱🌱个人主页:奋斗的明志 🌱🌱所属专栏:RabbitMQ 📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。 Ubuntu 环境安装 * 前言 * 一、什么是MQ(消息队列) * MQ多用于分布式系统之间进行通信 * 二、MQ的作用 * 1、异步解耦 * 2、流量削峰 * 3、消息分发 * 4、延迟通知 * 三、为什么选择 RabbitMQ * 1、Kafka * 2、RocketMQ * 3、RabbitMQ * 四、RabbitMQ 快速上手 * 1、Ubuntu 环境安装 * 2、安装Erlang * 3、

By Ne0inhk