【Linux】基于环形队列的生产消费者模型
基于环形队列的生产消费者模型
一、POSIX信号量
1、概述
在我们进行环形队列的生产消费者模型的学习之前,我们要对前置条件POSIX信号量进行学习,这里的POSIX的信号量与systemV的信号量是几乎一致的,都是用于同步操作,达到无冲突的访问共享资源的目的,只是POSIX信号量的使用要更简单一些,可以用于线程间同步
信号量的本质就是一个计数器,它的本质就是用来描述资源数目的,把资源是否就绪放到了临界区之外,在申请信号量的时候其实已经就是间接在做判断了
2、调用接口
(一)初始化信号量
#include<semaphore.h>intsem_init(sem_t*sem,int pshared,unsignedint value);返回值:成功返回0,失败返回-1sem:指向要初始化的信号量对象的指针pshared:指定信号量的共享属性,如果pshared为 0,表示信号量是进程内共享的,只能在创建它的进程内的多个线程之间使用,如果pshared非 0,表示信号量可以在多个进程之间共享value:指定信号量的初始值,表示可以同时访问共享资源的线程或进程的数量
(二)销毁信号量
#include<semaphore.h>intsem_destroy(sem_t*sem);返回值:成功返回0,失败返回-1sem:指向要销毁的信号量对象的指针
(三)等待信号量
#include<semaphore.h>intsem_wait(sem_t*sem);返回值:成功返回0,失败返回-1sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的
sem_wait 函数执行的是信号量的 P 操作
如果信号量 sem 的值大于 0,sem_wait 会将信号量的值减 1,然后立即返回,调用线程或进程可以继续执行后续代码,意味着该线程或进程成功获取了对共享资源的访问权
如果信号量 sem 的值等于 0,sem_wait 会使调用线程或进程进入阻塞状态,直到信号量的值大于 0 为止。一旦信号量的值变为大于 0,sem_wait 会将信号量的值减 1 并返回,线程或进程继续执行
(四)发布信号量
#include<semaphore.h>intsem_post(sem_t*sem);返回值:成功返回0,失败返回-1sem:指向要操作的信号量对象的指针,这个指针一定要是被初始化过的
sem_post 函数执行的是信号量的 V 操作,会将信号量 sem 的值加 1
如果在调用 sem_post 之前,有其他线程或进程因为调用 sem_wait 而阻塞在该信号量上(即信号量的值为 0),那么在信号量的值加 1 之后,系统会唤醒其中一个阻塞的线程或进程,被唤醒的线程或进程会将信号量的值再减 1 并继续执行后续代码
3、在环形队列中的作用
我们在之前应该都接触过环形队列,在环形队列中,一般我们是需要一个计数器的,或者在环形队列中留出最后一个位置,因为如果没有这些措施,我们就不知道双指针谁在前谁在后了,我们这里使用信号量替代了这个计数器
二、基于环形队列的生产消费者模型
1、理论探究

我们通过数组以及模运算的方式来模拟环状模型,前面的基于阻塞队列的生产消费者模型底层来说是基于容器queue的,其空间可以动态分配,现在是基于固定大小的,基于容器vector
其中生产者关注的是环形队列的空间资源,消费者关心的是环形队列的数据资源,而环形队列中的空间资源+数据资源=全部资源,只要有空间生产者就可以生产数据然后放入,只要有数据消费者就可以取出数据然后加工
2、代码实现
(一)RingQueue.hpp
#pragmaonce#include<iostream>#include<vector>#include<semaphore.h>#include<pthread.h>//环形队列默认容量conststaticint defaultcap =8;//环形队列核心接口:PV操作以及加锁解锁template<classT>classRingQueue{private:voidP(sem_t &sem){sem_wait(&sem);}voidV(sem_t &sem){sem_post(&sem);}voidLock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}voidUnlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public://初始化RingQueue(int cap = defaultcap):ringqueue_(cap),cap_(cap),c_step_(0),p_step_(0){sem_init(&cdata_sem_,0,0);sem_init(&pspace_sem_,0, cap);//生产者消费者的锁pthread_mutex_init(&c_mutex_,nullptr);pthread_mutex_init(&p_mutex_,nullptr);}voidPush(const T &in)// 生产活动{//调用P函数检查队列中是否有可用空间,没有可用空间线程会阻塞P(pspace_sem_);//这里为什么要先P后加锁,下面详谈Lock(p_mutex_); ringqueue_[p_step_]= in;// 位置后移,维持环形特性 p_step_++; p_step_ %= cap_;Unlock(p_mutex_);V(cdata_sem_);}voidPop(T *out)// 消费活动{P(cdata_sem_);Lock(c_mutex_);*out = ringqueue_[c_step_];// 位置后移,维持环形特性 c_step_++; c_step_ %= cap_;Unlock(c_mutex_);V(pspace_sem_);}//析构销毁~RingQueue(){sem_destroy(&cdata_sem_);sem_destroy(&pspace_sem_);pthread_mutex_destroy(&c_mutex_);pthread_mutex_destroy(&p_mutex_);}private: std::vector<T> ringqueue_;// 环形队列的底层实现int cap_;// 队列容量int c_step_;// 消费者下标int p_step_;// 生产者下标 sem_t cdata_sem_;// 队中可用数据资源 sem_t pspace_sem_;// 队中可用空间资源 pthread_mutex_t c_mutex_;// 消费者锁 pthread_mutex_t p_mutex_;// 生产者锁};(二)Task.hpp
任务函数还是上一次的任务
#pragmaonce#include<iostream>#include<string> std::string opers="+-*/%";enum{ DivZero=1, ModZero, Unknown };classTask{public:Task(){}Task(int x,int y,char op):data1_(x),data2_(y),oper_(op),result_(0),exitcode_(0){}voidrun(){switch(oper_){case'+': result_ = data1_ + data2_;break;case'-': result_ = data1_ - data2_;break;case'*': result_ = data1_ * data2_;break;case'/':{if(data2_ ==0) exitcode_ = DivZero;else result_ = data1_ / data2_;}break;case'%':{if(data2_ ==0) exitcode_ = ModZero;else result_ = data1_ % data2_;}break;default: exitcode_ = Unknown;break;}}voidoperator()(){run();} std::string GetResult(){ std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r +="="; r += std::to_string(result_); r +="[code: "; r += std::to_string(exitcode_); r +="]";return r;} std::string GetTask(){ std::string r = std::to_string(data1_); r += oper_; r += std::to_string(data2_); r +="=?";return r;}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;};(三)main.cpp
#include<iostream>#include<pthread.h>#include<unistd.h>#include<ctime>#include"RingQueue.hpp"#include"Task.hpp"usingnamespace std;//这个结构体是方便我们打印的时候查看方便的structThreadData{ RingQueue<Task>*rq;//环形队列 std::string threadname;//线程名字};void*Productor(void*args){ ThreadData *td =static_cast<ThreadData*>(args); RingQueue<Task>*rq = td->rq; std::string name = td->threadname;int len = opers.size();while(true){// 模拟获取数据int data1 =rand()%10+1;usleep(10);int data2 =rand()%10;char op = opers[rand()% len]; Task t(data1, data2, op);// 生产数据 rq->Push(t); cout <<"Productor task done, task is : "<< t.GetTask()<<" who: "<< name << endl;sleep(1);}returnnullptr;}void*Consumer(void*args){ ThreadData *td =static_cast<ThreadData*>(args); RingQueue<Task>*rq = td->rq; std::string name = td->threadname;while(true){// 消费数据 Task t; rq->Pop(&t);// 处理数据t(); cout <<"Consumer get task, task is : "<< t.GetTask()<<" who: "<< name <<" result: "<< t.GetResult()<< endl;}returnnullptr;}intmain(){srand(time(nullptr)); RingQueue<Task>*rq =new RingQueue<Task>(10); pthread_t c[5], p[3];//这里我们为了方便查看,统一用单生产单消费for(int i =0; i <1; i++){ ThreadData *td =newThreadData(); td->rq = rq; td->threadname ="Productor-"+ std::to_string(i);pthread_create(p + i,nullptr, Productor, td);}for(int i =0; i <1; i++){ ThreadData *td =newThreadData(); td->rq = rq; td->threadname ="Consumer-"+ std::to_string(i);pthread_create(c + i,nullptr, Consumer, td);}for(int i =0; i <1; i++){pthread_join(p[i],nullptr);}for(int i =0; i <1; i++){pthread_join(c[i],nullptr);}return0;}
3、PV操作包裹住加解锁操作的原因
在 Pop和Push 函数中,以Push 函数为例,P(pspace_sem_) 和 V(cdata_sem_) 包裹着 Lock(p_mutex_) 和 Unlock(p_mutex_) 这种设计是为了实现更细粒度的同步控制,尽可能减少锁的竞争,以确保线程安全和高效性,下面详细解释其原因:
P(pspace_sem_)在Lock(p_mutex_)之前- 信号量的作用:
pspace_sem_信号量用于表示环形队列中可用的空间资源,P(pspace_sem_)操作会检查信号量的值,如果值大于 0,则将其减 1 并继续执行,如果值为 0,则线程会阻塞,直到有可用空间(即其他线程调用V(pspace_sem_)释放空间) - 避免不必要的加锁:在尝试获取互斥锁之前先检查信号量,可以避免在没有可用空间时加锁,因为如果没有可用空间,即使加了锁也无法进行生产操作,还会导致其他线程无法释放空间,造成资源浪费和性能下降,通过先检查信号量,只有在有可用空间时才去获取互斥锁,减少了锁的竞争,提高了程序的效率
- 信号量的作用:
V(cdata_sem_)在Unlock(p_mutex_)之后- 信号量的通知机制:
cdata_sem_信号量用于表示环形队列中可用的数据资源,V(cdata_sem_)操作会将信号量的值加 1,如果有消费者线程因为等待数据而阻塞,会唤醒其中一个线程 - 避免死锁和数据不一致:在释放互斥锁之后再增加
cdata_sem_信号量的值,可以确保在通知消费者有新数据可用之前,生产者已经完成了对共享资源的修改,并且释放了锁,如果在加锁状态下就增加信号量,可能会导致消费者线程被唤醒后尝试获取锁,但由于生产者还持有锁而无法进入临界区,从而造成死锁或数据不一致的问题
- 信号量的通知机制:
今日分享就到这里啦~