【Linux】生产者-消费者模型及条件变量
一、生产者-消费者模型
当多个线程同时操作一份共享数据时,我们会遇到一个非常现实的问题:有的线程负责生产数据,有的线程负责使用数据。如果数据还没准备好,使用端线程就不断去检查、争抢资源,会造成大量无意义的 CPU 消耗;而数据满了,生产端线程还继续写入,又会导致数据错乱。
我们可以用一个很形象的例子来理解:有一个只能容纳一个苹果的盘子,这就是我们的临界区;有一个人往盘子里放苹果,他是生产者;还有三个人从盘子里取苹果,他们是消费者。但关键在于:这几个人都被蒙上了眼睛,而且彼此之间无法交流。
于是就出现了尴尬的局面:生产者不知道盘子里有没有苹果,只能反复伸手去摸、去试探;消费者也不知道盘子里有没有苹果,只能不停过来查看、争抢。大家都在做无意义的尝试,既浪费精力,又可能出现 “盘子空了还在取、盘子满了还在放” 的混乱情况。

这就是典型的:缺少同步、缺少等待 - 通知机制。而我们要介绍的生产者 - 消费者模型,正是为了解决这个问题而生。
可以想到,出现这样的问题最大的原因就是多个线程之间缺少‘配合’,生产端放入了数据但是消费端不知道,消费端取走了数据但是生产端也不知道,盘子容量有限也引起了生产端和消费端内部的竞争,此时就引入了同步和等待-通知的核心思想,在上面的模型中,引入一把锁和一个铃铛。

锁的出现就防止了因为竞争产生的安全问题(多拿/多放),铃铛则解决了生产者和消费者之间的交流问题,让整个过程变得有序,消除无意义的访问,让过程更加高效。这整个过程中用到的锁和铃铛就是互斥锁和条件变量。
概括归纳:
总的来说,这个模型需要维护以下关系:
3种关系:消费者与消费者之间互斥、消费者与生产者之间互斥、生产者与生产者之间互斥(多生产者)。
2种角色:消费者与生产者。
1个空间:消费者与生产者进行数据‘交易’的空间。
二、条件变量
概念介绍
条件变量是一种用于多线程 / 多进程同步的机制,它允许线程在某个共享资源的状态不满足预设条件时,主动进入阻塞等待状态,直到其他线程修改了共享资源的状态并发出通知,才会被唤醒并重新检查条件。
相关接口

pthread_cond_wait:「死等」—— 只要没有其他线程发通知,线程会一直阻塞在条件变量上,直到被唤醒(或收到信号中断)。pthread_cond_timedwait:「限时等」—— 设定一个截止时间,若到时间仍未收到通知,函数会自动返回,线程被唤醒,避免 “永久卡死”。
这里以pthread_cond_wait这个更为基础的接口展开:
接口原型包含包含两个形参:pthread_mutex_t *和pthread_cond_t *,为什么跟互斥锁也有关系呢?其实很容易想到,条件变量的初衷就是解决多线程对临界资源的处理问题,既然作为临界资源,必然需要互斥锁进行安全问题维护。
其中pthread_cond_t是条件变量对应的类型名称,其初始化和pthread_mutex_t(互斥锁)步骤相同。
pthread_cond_wait()需要配合pthread_cond_signal()/pthread_cond_broadcast()进行使用。

pthread_cond_signal 用于唤醒等待在指定条件变量上的一个线程(通常是队列中首个等待线程),适用于仅需一个线程处理临界资源的场景(如单消费者 / 单生产者)。
pthread_cond_broadcast 用于唤醒等待在指定条件变量上的所有线程,适用于多个线程需同时响应条件满足的场景(如多消费者 / 多生产者)。
执行流程与基本用法
线程在执行到pthread_cond_wait函数的时候会释放钥匙并且加入阻塞等待队列,待其他线程通过pthread_cond_signal/pthread_cond_broadcast唤醒后会接着执行下文。对于程序逻辑就可以概括为以下流程图

整个过程形成逻辑闭环。
代码演示与坑点补充
有了逻辑流程图,就能简单设计一段demo程序,假设一个共享数据data,设置上限为5,当data>0的时候消费者可以进行数据消费(data--),当data为0的时候生产者需要进行数据补充(data++)并且保证data小于上限5。
#include <pthread.h> #include <iostream> #include <unistd.h> #define MaxData 5 pthread_cond_t _getcond = PTHREAD_COND_INITIALIZER; pthread_cond_t _incond = PTHREAD_COND_INITIALIZER; pthread_mutex_t _lock = PTHREAD_MUTEX_INITIALIZER; int data = 0; void *consumer(void *args) { const char *name = static_cast<const char *>(args); while (1) { { pthread_mutex_lock(&_lock); if (data > 0)//bug?? { data--; std::cout << name << "拿走了一个数据,当前数据:" << data << std::endl; } else { std::cout << "当前数据不足,唤醒生产者" << std::endl; pthread_cond_signal(&_incond); pthread_cond_wait(&_getcond, &_lock); } pthread_mutex_unlock(&_lock); sleep(1); } } return nullptr; } void *producer(void *args) { while (1) { { pthread_mutex_lock(&_lock); if (data >= MaxData)//bug?? { pthread_cond_wait(&_incond, &_lock); } data++; std::cout << "我放入了一个数据,当前数据" << data << std::endl; pthread_cond_signal(&_getcond); std::cout << "我唤醒了一个消费者" << std::endl; pthread_mutex_unlock(&_lock); usleep(5000); } } return nullptr; } int main() { pthread_t td1, td2, td3; pthread_create(&td1, nullptr, producer, (void *)"producer"); pthread_create(&td2, nullptr, consumer, (void *)"consumer1"); pthread_create(&td3, nullptr, consumer, (void *)"consumer2"); pthread_join(td1, nullptr); pthread_join(td2, nullptr); pthread_join(td3, nullptr); }
执行后就会出现以下效果,但是代码其实有一个隐藏的bug——当一个线程执行到pthread_cond_wait后会进入阻塞队列进行pthread_cond_signal唤醒等待,等pthread_cond_signal信号到了之后返回运行队列,但是在进入等待队列后,线程可能会出现提前返回的问题,也叫做伪唤醒:
1、信号(SINGINT...)导致线程提前返回。
2、调用pthread_cond_broadcast,所有队列线程被唤醒导致多线程之间出现资源竞争,没有得到资源的线程继续执行下文代码导致安全问题。
3、操作系统为了管理阻塞队列主动唤醒某个线程,导致线程提前返回。
以上问题均会导致程序不按照预期执行,那么为了解决这个问题,就需要把if判断语句改为while进行判断,这样,即使出现伪唤醒的问题,线程也会再次进入while执行判断,防止伪唤醒带来安全问题。
三、实战代码——生产者-消费者模型构建与封装
有了上述知识铺垫,我们可以尝试面向对象进行生产者--消费者模型的封装,下面是运行截图以及实战代码:

//Blockqueue.hpp #include <pthread.h> #include <queue> #include <iostream> #define testlen 5 pthread_mutex_t _mutex; pthread_cond_t _cond_empty; pthread_cond_t _cond_full; template <typename T> class _Blockqueue { private: bool IsFull() { return _blockqueue.size() >= _len; } bool IsEmpty() { return _blockqueue.empty(); } public: _Blockqueue() : _csleep(0), _psleep(0), _len(testlen) { pthread_cond_init(&_cond_empty, nullptr); pthread_cond_init(&_cond_full, nullptr); pthread_mutex_init(&_mutex, nullptr); } void Insert(const T &in) { pthread_mutex_lock(&_mutex); while (IsFull()) { _psleep++; pthread_cond_wait(&_cond_full, &_mutex); _psleep--; } _blockqueue.push(in); std::cout << "我唤醒了一个消费者,当前等待消费者:" << _csleep << std::endl; pthread_cond_signal(&_cond_empty); pthread_mutex_unlock(&_mutex); } T Pop() { pthread_mutex_lock(&_mutex); while (IsEmpty()) { _csleep++; pthread_cond_wait(&_cond_empty, &_mutex); _csleep--; } T _get = _blockqueue.front(); if (_psleep > 0) { std::cout << "我唤醒了一个生产者" << std::endl; pthread_cond_signal(&_cond_full); } _blockqueue.pop(); pthread_mutex_unlock(&_mutex); return _get; } ~_Blockqueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond_empty); pthread_cond_destroy(&_cond_full); } private: std::queue<T> _blockqueue; int _csleep; int _psleep; int _len; };//Task.hpp #include <functional> #include <iostream> #include <vector> #include <time.h> void Download() { std::cout << "我是一个下载任务" << std::endl; } void Upload() { std::cout << "我是一个上传任务" << std::endl; } void Updata() { std::cout << "我是一个更新任务" << std::endl; } std::function<void()> func1 = Download; std::function<void()> func2 = Upload; std::function<void()> func3 = Updata; class Task { private: void init_srand() { srand((unsigned int)time(0)); } public: Task() { tasks.push_back(func1); tasks.push_back(func2); tasks.push_back(func3); } std::function<void()> dispatch() { init_srand(); int _random = rand() % 3; return tasks[_random]; } ~Task() { } private: std::vector<std::function<void()>> tasks; };/Main.cpp #include "Blockqueue.hpp" #include <iostream> #include <unistd.h> #include "Task.hpp" #include <functional> Task *_task = new Task; void *producer(void *args) { while (true) { _Blockqueue<Task *> *_bq = static_cast<_Blockqueue<Task *> *>(args); _bq->Insert(_task); } return nullptr; } void *consumer(void *args) { while (true) { _Blockqueue<Task *> *_bq = static_cast<_Blockqueue<Task *> *>(args); std::function<void()> _task = _bq->Pop()->dispatch(); std::cout << "我拿到了任务" << std::endl; _task(); sleep(3); } return nullptr; } int main() { _Blockqueue<Task*> *bq = new _Blockqueue<Task*>; pthread_t td1, td2, td3, td4, td5; pthread_create(&td1, nullptr, producer, bq); pthread_create(&td2, nullptr, producer, bq); pthread_create(&td3, nullptr, consumer, bq); pthread_create(&td4, nullptr, consumer, bq); pthread_create(&td5, nullptr, consumer, bq); pthread_join(td1, nullptr); pthread_join(td2, nullptr); pthread_join(td3, nullptr); pthread_join(td4, nullptr); pthread_join(td5, nullptr); }