【Linux】生产者-消费者模型及条件变量

【Linux】生产者-消费者模型及条件变量

一、生产者-消费者模型

当多个线程同时操作一份共享数据时,我们会遇到一个非常现实的问题:有的线程负责生产数据,有的线程负责使用数据。如果数据还没准备好,使用端线程就不断去检查、争抢资源,会造成大量无意义的 CPU 消耗;而数据满了,生产端线程还继续写入,又会导致数据错乱。

我们可以用一个很形象的例子来理解:有一个只能容纳一个苹果的盘子,这就是我们的临界区;有一个人往盘子里放苹果,他是生产者;还有三个人从盘子里取苹果,他们是消费者。但关键在于:这几个人都被蒙上了眼睛,而且彼此之间无法交流。

于是就出现了尴尬的局面:生产者不知道盘子里有没有苹果,只能反复伸手去摸、去试探;消费者也不知道盘子里有没有苹果,只能不停过来查看、争抢。大家都在做无意义的尝试,既浪费精力,又可能出现 “盘子空了还在取、盘子满了还在放” 的混乱情况。

这就是典型的:缺少同步、缺少等待 - 通知机制。而我们要介绍的生产者 - 消费者模型,正是为了解决这个问题而生。

可以想到,出现这样的问题最大的原因就是多个线程之间缺少‘配合’,生产端放入了数据但是消费端不知道,消费端取走了数据但是生产端也不知道,盘子容量有限也引起了生产端和消费端内部的竞争,此时就引入了同步等待-通知的核心思想,在上面的模型中,引入一把锁和一个铃铛。

锁的出现就防止了因为竞争产生的安全问题(多拿/多放),铃铛则解决了生产者和消费者之间的交流问题,让整个过程变得有序,消除无意义的访问,让过程更加高效。这整个过程中用到的锁和铃铛就是互斥锁和条件变量。

概括归纳:

总的来说,这个模型需要维护以下关系:

3种关系:消费者与消费者之间互斥、消费者与生产者之间互斥、生产者与生产者之间互斥(多生产者)。

2种角色:消费者与生产者。

1个空间:消费者与生产者进行数据‘交易’的空间。

二、条件变量

概念介绍

条件变量是一种用于多线程 / 多进程同步的机制,它允许线程在某个共享资源的状态不满足预设条件时,主动进入阻塞等待状态,直到其他线程修改了共享资源的状态并发出通知,才会被唤醒并重新检查条件。

相关接口

  • pthread_cond_wait:「死等」—— 只要没有其他线程发通知,线程会一直阻塞在条件变量上,直到被唤醒(或收到信号中断)。
  • pthread_cond_timedwait:「限时等」—— 设定一个截止时间,若到时间仍未收到通知,函数会自动返回,线程被唤醒,避免 “永久卡死”。

这里以pthread_cond_wait这个更为基础的接口展开:

接口原型包含包含两个形参:pthread_mutex_t *和pthread_cond_t *,为什么跟互斥锁也有关系呢?其实很容易想到,条件变量的初衷就是解决多线程对临界资源的处理问题,既然作为临界资源,必然需要互斥锁进行安全问题维护。

其中pthread_cond_t是条件变量对应的类型名称,其初始化和pthread_mutex_t(互斥锁)步骤相同。

pthread_cond_wait()需要配合pthread_cond_signal()/pthread_cond_broadcast()进行使用。

pthread_cond_signal 用于唤醒等待在指定条件变量上的一个线程(通常是队列中首个等待线程),适用于仅需一个线程处理临界资源的场景(如单消费者 / 单生产者)。

pthread_cond_broadcast 用于唤醒等待在指定条件变量上的所有线程,适用于多个线程需同时响应条件满足的场景(如多消费者 / 多生产者)。

执行流程与基本用法

线程在执行到pthread_cond_wait函数的时候会释放钥匙并且加入阻塞等待队列,待其他线程通过pthread_cond_signal/pthread_cond_broadcast唤醒后会接着执行下文。对于程序逻辑就可以概括为以下流程图

整个过程形成逻辑闭环。

代码演示与坑点补充

有了逻辑流程图,就能简单设计一段demo程序,假设一个共享数据data,设置上限为5,当data>0的时候消费者可以进行数据消费(data--),当data为0的时候生产者需要进行数据补充(data++)并且保证data小于上限5。

#include <pthread.h> #include <iostream> #include <unistd.h> #define MaxData 5 pthread_cond_t _getcond = PTHREAD_COND_INITIALIZER; pthread_cond_t _incond = PTHREAD_COND_INITIALIZER; pthread_mutex_t _lock = PTHREAD_MUTEX_INITIALIZER; int data = 0; void *consumer(void *args) { const char *name = static_cast<const char *>(args); while (1) { { pthread_mutex_lock(&_lock); if (data > 0)//bug?? { data--; std::cout << name << "拿走了一个数据,当前数据:" << data << std::endl; } else { std::cout << "当前数据不足,唤醒生产者" << std::endl; pthread_cond_signal(&_incond); pthread_cond_wait(&_getcond, &_lock); } pthread_mutex_unlock(&_lock); sleep(1); } } return nullptr; } void *producer(void *args) { while (1) { { pthread_mutex_lock(&_lock); if (data >= MaxData)//bug?? { pthread_cond_wait(&_incond, &_lock); } data++; std::cout << "我放入了一个数据,当前数据" << data << std::endl; pthread_cond_signal(&_getcond); std::cout << "我唤醒了一个消费者" << std::endl; pthread_mutex_unlock(&_lock); usleep(5000); } } return nullptr; } int main() { pthread_t td1, td2, td3; pthread_create(&td1, nullptr, producer, (void *)"producer"); pthread_create(&td2, nullptr, consumer, (void *)"consumer1"); pthread_create(&td3, nullptr, consumer, (void *)"consumer2"); pthread_join(td1, nullptr); pthread_join(td2, nullptr); pthread_join(td3, nullptr); }

执行后就会出现以下效果,但是代码其实有一个隐藏的bug——当一个线程执行到pthread_cond_wait后会进入阻塞队列进行pthread_cond_signal唤醒等待,等pthread_cond_signal信号到了之后返回运行队列,但是在进入等待队列后,线程可能会出现提前返回的问题,也叫做伪唤醒

1、信号(SINGINT...)导致线程提前返回。

2、调用pthread_cond_broadcast,所有队列线程被唤醒导致多线程之间出现资源竞争,没有得到资源的线程继续执行下文代码导致安全问题。

3、操作系统为了管理阻塞队列主动唤醒某个线程,导致线程提前返回。

以上问题均会导致程序不按照预期执行,那么为了解决这个问题,就需要把if判断语句改为while进行判断,这样,即使出现伪唤醒的问题,线程也会再次进入while执行判断,防止伪唤醒带来安全问题。

三、实战代码——生产者-消费者模型构建与封装

有了上述知识铺垫,我们可以尝试面向对象进行生产者--消费者模型的封装,下面是运行截图以及实战代码:

//Blockqueue.hpp #include <pthread.h> #include <queue> #include <iostream> #define testlen 5 pthread_mutex_t _mutex; pthread_cond_t _cond_empty; pthread_cond_t _cond_full; template <typename T> class _Blockqueue { private: bool IsFull() { return _blockqueue.size() >= _len; } bool IsEmpty() { return _blockqueue.empty(); } public: _Blockqueue() : _csleep(0), _psleep(0), _len(testlen) { pthread_cond_init(&_cond_empty, nullptr); pthread_cond_init(&_cond_full, nullptr); pthread_mutex_init(&_mutex, nullptr); } void Insert(const T &in) { pthread_mutex_lock(&_mutex); while (IsFull()) { _psleep++; pthread_cond_wait(&_cond_full, &_mutex); _psleep--; } _blockqueue.push(in); std::cout << "我唤醒了一个消费者,当前等待消费者:" << _csleep << std::endl; pthread_cond_signal(&_cond_empty); pthread_mutex_unlock(&_mutex); } T Pop() { pthread_mutex_lock(&_mutex); while (IsEmpty()) { _csleep++; pthread_cond_wait(&_cond_empty, &_mutex); _csleep--; } T _get = _blockqueue.front(); if (_psleep > 0) { std::cout << "我唤醒了一个生产者" << std::endl; pthread_cond_signal(&_cond_full); } _blockqueue.pop(); pthread_mutex_unlock(&_mutex); return _get; } ~_Blockqueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond_empty); pthread_cond_destroy(&_cond_full); } private: std::queue<T> _blockqueue; int _csleep; int _psleep; int _len; };
//Task.hpp #include <functional> #include <iostream> #include <vector> #include <time.h> void Download() { std::cout << "我是一个下载任务" << std::endl; } void Upload() { std::cout << "我是一个上传任务" << std::endl; } void Updata() { std::cout << "我是一个更新任务" << std::endl; } std::function<void()> func1 = Download; std::function<void()> func2 = Upload; std::function<void()> func3 = Updata; class Task { private: void init_srand() { srand((unsigned int)time(0)); } public: Task() { tasks.push_back(func1); tasks.push_back(func2); tasks.push_back(func3); } std::function<void()> dispatch() { init_srand(); int _random = rand() % 3; return tasks[_random]; } ~Task() { } private: std::vector<std::function<void()>> tasks; };
/Main.cpp #include "Blockqueue.hpp" #include <iostream> #include <unistd.h> #include "Task.hpp" #include <functional> Task *_task = new Task; void *producer(void *args) { while (true) { _Blockqueue<Task *> *_bq = static_cast<_Blockqueue<Task *> *>(args); _bq->Insert(_task); } return nullptr; } void *consumer(void *args) { while (true) { _Blockqueue<Task *> *_bq = static_cast<_Blockqueue<Task *> *>(args); std::function<void()> _task = _bq->Pop()->dispatch(); std::cout << "我拿到了任务" << std::endl; _task(); sleep(3); } return nullptr; } int main() { _Blockqueue<Task*> *bq = new _Blockqueue<Task*>; pthread_t td1, td2, td3, td4, td5; pthread_create(&td1, nullptr, producer, bq); pthread_create(&td2, nullptr, producer, bq); pthread_create(&td3, nullptr, consumer, bq); pthread_create(&td4, nullptr, consumer, bq); pthread_create(&td5, nullptr, consumer, bq); pthread_join(td1, nullptr); pthread_join(td2, nullptr); pthread_join(td3, nullptr); pthread_join(td4, nullptr); pthread_join(td5, nullptr); }

Read more

内存暴涨700%背后的惊天真相:AI正在吞噬一切!能源·隐私·绿色三大维度深度拆解

内存暴涨700%背后的惊天真相:AI正在吞噬一切!能源·隐私·绿色三大维度深度拆解

🔥作者简介: 一个平凡而乐于分享的小比特,中南民族大学通信工程专业研究生,研究方向无线联邦学习 🎬擅长领域:驱动开发,嵌入式软件开发,BSP开发 ❄️作者主页:一个平凡而乐于分享的小比特的个人主页 ✨收录专栏:未来思考,本专栏结合当前国家战略和实时政治,对未来行业发展的思考 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖💖 🔥内存暴涨700%背后的惊天真相:AI正在吞噬一切!能源·隐私·绿色三大维度深度拆解 |前言| 最近装机的小伙伴们欲哭无泪:DDR5内存价格一路狂飙,部分DRAM现货价格在过去一年暴涨近700% 。大家习惯性吐槽“厂商放火”、“产能不足”,但很少有人看到,这场涨价风暴的真正推手,是那只名为“AI”的巨兽。 当你还在为多花几百块钱买内存心疼时,国家正在西部荒漠建起一座座数据中心,科技巨头正在为“吃电怪兽”抢购每一颗芯片。2026年,大型科技公司的AI相关投资预计将达到6500亿美元,较去年增长约80% 。 今天,我们从能源供应、隐私安全、绿色AI 三个维度,结合东数西算、算电协同、

By Ne0inhk
OpenClaw/MaxClaw/KimiClaw/Molili四大AI Agent横评!

OpenClaw/MaxClaw/KimiClaw/Molili四大AI Agent横评!

2026年爆火的开源AI Agent项目OpenClaw,因配置复杂劝退99%非技术用户,催生了MaxClaw、KimiClaw等云端简化版,以及本土化适配的Molili中文版。 一、四大产品核心定位与基础信息 产品名称 核心定位 开发主体 部署方式 核心优势 核心短板 OpenClaw 开源AI Agent框架("老大哥") 开源社区 本地部署(需技术配置) 功能最强、数据完全自主、生态最丰富 安装复杂、需技术基础、网络配置繁琐 MaxClaw 云端精装版OpenClaw MiniMax 云端一键部署 预置工具丰富、飞书 / 钉钉深度集成、平衡便捷与功能 需订阅会员、数据存云端、功能依赖平台更新 KimiClaw 浏览器轻量版OpenClaw 月之暗面(Kimi) 云端免部署(浏览器 / APP 内使用) 门槛最低、

By Ne0inhk
苹果电脑(macOS)Safari 浏览器开启开发者模式完整指南

苹果电脑(macOS)Safari 浏览器开启开发者模式完整指南

陈牧函 在macOS系统中,Safari浏览器默认隐藏开发者模式,需通过简单设置激活,以便使用 “检查元素”“网络监控”“JS 控制台”等调试工具。以下是分步骤操作方法、进阶功能及常见问题解决方案: 一、基础步骤:开启 “开发” 菜单(核心操作) 所有 macOS 版本通用,是激活开发者模式的核心步骤,必须先完成这一步: 1.打开 Safari 浏览器 点击 Dock 栏中的 Safari 图标,或从 “应用程序” 文件夹中启动 Safari。 2.进入 Safari 设置(偏好设置) 点击屏幕左上角的「Safari」菜单(位于菜单栏最左侧,苹果图标右侧),在下拉菜单中选择「设置」(部分旧版本显示为 “偏好设置”

By Ne0inhk
【Linux/C++多线程篇(一) 】多线程编程入门:从核心概念到常用函数详解

【Linux/C++多线程篇(一) 】多线程编程入门:从核心概念到常用函数详解

⭐️在这个怀疑的年代,我们依然需要信仰。 个人主页:YYYing. ⭐️Linux/C++进阶系列专栏:【从零开始的linux/c++进阶编程】 系列上期内容:【Linux/C++多进程篇(二) 】linux系统编程之进程间通信 (IPC) 系列下期内容:【Linux/C++多线程篇(二) 】同步互斥机制 & C++ 11下的多线程 目录 前言:为什么需要多线程? 多线程基础概念 一、进程与线程的区别 二、进程与线程的关系 三、多线程的优缺点  📖 优点  📖 缺点 多线程编程 一、创建线程:pthread_create  📖 向线程体中传递单个数据  📖 向线程体中传入多个数据 二、线程号的获取:pthread_self 三、

By Ne0inhk