Linux 信号量详解与基于环形队列的生产消费模型实现
Linux 信号量用于控制线程对共享资源的访问,通过 P/V 操作管理计数器实现同步与互斥。信号量接口及初始化销毁流程,结合环形队列实现单生产单消费及多生产多消费模型。利用信号量解决空满判断问题,配合互斥锁保护步长指针,确保多线程环境下数据一致性。总结指出信号量兼具同步与互斥功能,而互斥锁仅负责互斥。

Linux 信号量用于控制线程对共享资源的访问,通过 P/V 操作管理计数器实现同步与互斥。信号量接口及初始化销毁流程,结合环形队列实现单生产单消费及多生产多消费模型。利用信号量解决空满判断问题,配合互斥锁保护步长指针,确保多线程环境下数据一致性。总结指出信号量兼具同步与互斥功能,而互斥锁仅负责互斥。

共享资源作为整体使用时,若需将资源分开局部使用,例如电影院座位分配,需考虑两个问题:放过多的线程进入共享资源空间;没有放过多的线程进入共享资源空间,但不同线程访问了同一份资源。
第二个问题需程序员通过代码维护,第一个问题可通过信号量解决。互斥量本质是一个计数器,对整体共享资源的局部资源数目做计数。线程进入共享资源需要对计数器做减操作(P 操作,本质对资源做申请),退出共享资源需要对计数器做加操作(V 操作,本质对资源做归还)。当计数器小于等于 0 时就不再允许线程进入共享资源。
线程要访问局部资源时要先申请信号量(P 操作),类似买电影票预定座位。信号量本身也是共享资源,其申请和释放操作必须是原子的。
信号量在 semaphore.h 头文件中定义。sem_init 函数参数包括信号量类型 sem_t、设置信号量在进程中使用还是线程中使用(0 表示在线程中使用)、设置信号量计数器的初始值。成功返回 0,失败返回 -1。
使用 sem_destroy 销毁信号量。
使用 sem_wait 对计数器做减操作。
使用 sem_post 对计数器做加操作。
将环形队列用于生产者消费者模型时,有三种主要情形:
利用信号量和环形队列实现单生产单消费模型。生产者关心空余空间资源,消费者关心数据资源。
假设环形队列有 10 个位置,定义两个信号量:space_sem 表示空余空间资源计数器,初始化为 10;data_sem 表示数据资源计数器,初始化为 0。
生产者生产时申请空间信号量(P),生产完成后释放数据信号量(V)。 消费者消费时申请数据信号量(P),消费完成后释放空间信号量(V)。
#include <iostream>
#include <semaphore.h>
class Sem {
public:
Sem(int initnum) : _initnum(initnum) {
sem_init(&_sem, 0, _initnum);
}
void P() {
sem_wait(&_sem);
}
void V() {
sem_post(&_sem);
}
~Sem() {
sem_destroy(&_sem);
}
private:
sem_t _sem;
int _initnum; // 计数器初始值
};
#include "Sem.hpp"
#include <iostream>
#include <vector>
const static int gcap = 5;
template<class T>
class RingQueue {
public:
RingQueue(int cap = gcap) : _cap(cap), _ring_queue(cap), _space_sem(cap), _data_sem(0), _p_step(0), _c_step(0) {}
void Enqueue(const T &in) {
_space_sem.P(); // 进行生产
_ring_queue[_p_step++] = in; // 维护环形队列
_p_step %= _cap;
_data_sem.V();
}
void Pop(T *out) {
_data_sem.P(); // 进行消费
*out = _ring_queue[_c_step++]; // 维护环形队列
_c_step %= _cap;
_space_sem.V();
}
~RingQueue() {}
private:
std::vector<T> _ring_queue; // 临界资源
int _cap; // 环形队列容量
Sem _space_sem; // 空闲空间信号量
Sem _data_sem; // 数据信号量
int _p_step; // 生产位置
int _c_step; // 消费位置
};
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void* consumer(void* args) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while (true) {
int data = 0;
rq->Pop(&data);
std::cout << "消费者消费了一个数据:" << data << std::endl;
sleep(1);
}
}
void* producer(void* args) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
int data = 1;
while (true) {
rq->Enqueue(data);
std::cout << "生产者生产了一个数据:" << data << std::endl;
data++;
}
}
int main() {
RingQueue<int>* rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, (void*)rq);
pthread_create(&p, nullptr, producer, (void*)rq);
(c, );
(p, );
;
}
_data_sem 为零,消费者会在 P 操作处阻塞,只有生产者能运行,变相保证了生产者能互斥访问临界资源。多生产多消费模型中,还需维护好生产者之间和消费者之间的互斥关系。_p_step 和 _c_step 是共享资源,需要保护。此时仅用信号量无法保护好步长,因为只要信号量不为零,同一个 step 位置就可以放进多个生产者或消费者访问,造成数据不一致。
因此需要利用互斥锁。为了兼顾效率,不让生产者和消费者串行访问,需要加两把锁,一把给生产者们使用,一把给消费者们使用。顺序上应先申请信号量再加锁,符合「锁的持有时间压缩到极致」的黄金原则。
#pragma once
#include <pthread.h>
class Mutex {
public:
Mutex() {
pthread_mutex_init(&_lock, nullptr);
}
void Lock() {
pthread_mutex_lock(&_lock);
}
void Unlock() {
pthread_mutex_unlock(&_lock);
}
pthread_mutex_t* Get() {
return &_lock;
}
~Mutex() {
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockGuard {
public:
LockGuard(Mutex* mutexp) : _mutexp(mutexp) {
_mutexp->Lock();
}
~LockGuard() {
_mutexp->Unlock();
}
private:
Mutex* _mutexp;
};
#include "Sem.hpp"
#include "Mutex.hpp"
#include <iostream>
#include <vector>
const static int gcap = 5;
template<class T>
class RingQueue {
public:
RingQueue(int cap = gcap) : _cap(cap), _ring_queue(cap), _space_sem(cap), _data_sem(0), _p_step(0), _c_step(0) {}
void Enqueue(const T &in) {
_space_sem.P();
{
LockGuard lockguard(&_p_lock); // 进行生产
_ring_queue[_p_step++] = in;
_p_step %= _cap;
}
_data_sem.V();
}
void Pop(T *out) {
_data_sem.P();
{
LockGuard lockguard(&_c_lock); // 进行消费
*out = _ring_queue[_c_step++];
_c_step %= _cap;
}
_space_sem.V();
}
~RingQueue() {}
private:
std::vector<T> _ring_queue;
int _cap;
Sem _space_sem;
Sem _data_sem;
_p_step;
_c_step;
Mutex _p_lock;
Mutex _c_lock;
};
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void* consumer(void* args) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while (true) {
int data = 0;
rq->Pop(&data);
std::cout << "消费者消费了一个数据:" << data << std::endl;
sleep(1);
}
}
void* producer(void* args) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
int data = 1;
while (true) {
rq->Enqueue(data);
std::cout << "生产者生产了一个数据:" << data << std::endl;
data++;
}
}
int main() {
RingQueue<int>* rq = new RingQueue<int>();
pthread_t c[2], p[3];
pthread_create(p, nullptr, producer, (void*)rq);
pthread_create(p + , , producer, (*)rq);
(p + , , producer, (*)rq);
(c, , consumer, (*)rq);
(c + , , consumer, (*)rq);
(c[], );
(c[], );
(p[], );
(p[], );
(p[], );
;
}
同步与互斥的关系:互斥是保证临界资源的安全访问,同步是保证线程的执行顺序;信号量可以同时完成同步 + 互斥,互斥锁只能完成互斥。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online