一、引言:多线程共享资源的问题
多线程的核心优势是共享进程资源、提高并发效率,但共享资源带来了一个致命问题——:多个线程并发操作共享资源时,由于执行时序不确定,导致最终结果与预期不符。比如多线程售票系统可能卖出负数票,银行转账可能导致余额不一致。
Linux 多线程编程中线程同步与互斥涉及临界资源保护与线程协作机制。文章详细阐述了互斥量(mutex)如何防止数据竞争,条件变量(cond)如何实现有序等待与唤醒。重点介绍了生产者消费者模型的两种实现方案:基于阻塞队列的动态扩容与基于环形队列的固定容量设计,对比了各自的性能与适用场景。此外还涵盖了线程安全与可重入函数的区别,以及死锁产生的条件与预防策略,提供了完整的 C/C++ 代码示例。

多线程的核心优势是共享进程资源、提高并发效率,但共享资源带来了一个致命问题——:多个线程并发操作共享资源时,由于执行时序不确定,导致最终结果与预期不符。比如多线程售票系统可能卖出负数票,银行转账可能导致余额不一致。
线程同步与互斥的核心目标,就是在保证多线程并发的同时,解决竞态条件,确保数据一致性。
举个直观例子:未加保护的售票系统
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100; // 共享资源
void* sell_ticket(void* arg) {
char* id = (char*)arg;
while(1) {
if(ticket > 0) {
usleep(1000); // 模拟业务耗时
printf("%s sells ticket:%d\n", id, ticket);
ticket--; // 非原子操作
} else {
break;
}
}
}
int main() {
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, sell_ticket, (void*)"thread 1");
pthread_create(&t2, NULL, sell_ticket, (void*)"thread 2");
pthread_create(&t3, NULL, sell_ticket, (void*)"thread 3");
pthread_create(&t4, NULL, sell_ticket, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
运行结果:出现负数票
原因:
if (ticket > 0)判断后,线程可能被切换,其他线程继续修改ticket;ticket--对应三条汇编指令(load→update→store),非原子操作,可能被打断。解决思路:互斥保证同一时间只有一个线程访问临界资源,同步保证线程按合理顺序访问。
ticket);if (ticket > 0)到ticket--的代码);swap指令),是互斥的底层基础。互斥量(mutex)是 Linux 提供的核心互斥工具,本质是一把'锁',通过原子操作实现锁的获取与释放,确保临界区的独占访问。
POSIX 线程库提供互斥量相关接口,需包含<pthread.h>,链接时加-lpthread。
| 接口 | 功能 | 关键说明 |
|---|---|---|
pthread_mutex_init | 动态初始化互斥量 | attr=NULL使用默认属性 |
PTHREAD_MUTEX_INITIALIZER | 静态初始化互斥量 | 全局或静态互斥量使用,无需销毁 |
pthread_mutex_lock | 加锁 | 阻塞等待,直到获取锁 |
pthread_mutex_unlock | 解锁 | 必须由加锁线程解锁 |
pthread_mutex_destroy | 销毁互斥量 | 静态初始化的互斥量无需销毁 |
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int ticket = 100;
pthread_mutex_t mutex;
void* sell_ticket(void* arg) {
char* id = (char*)arg;
while(1) {
int current_ticket = 0; // 临界区:只保护共享数据的读写
pthread_mutex_lock(&mutex);
if(ticket <= 0) {
pthread_mutex_unlock(&mutex);
break;
}
current_ticket = ticket;
ticket--;
pthread_mutex_unlock(&mutex);
// 模拟耗时操作
usleep(1000);
printf("%s sells ticket:%d\n", id, current_ticket);
}
return NULL;
}
int main() {
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, sell_ticket, (void*)"thread 1");
pthread_create(&t2, NULL, sell_ticket, (void*)"thread 2");
pthread_create(&t3, NULL, sell_ticket, (void*)"thread 3");
pthread_create(&t4, NULL, sell_ticket, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
运行结果:无负数票,每个线程独占临界区,数据一致。
手动加锁/解锁容易出现'忘记解锁'或'异常时未解锁'的问题,采用 RAII(资源获取即初始化)风格封装锁,利用 C++ 析构函数自动释放锁:
// Lock.hpp
#pragma once
#include <pthread.h>
namespace LockModule {
class Mutex {
public:
Mutex() { pthread_mutex_init(&_mutex, NULL); }
~Mutex() { pthread_mutex_destroy(&_mutex); }
void Lock() { pthread_mutex_lock(&_mutex); }
void Unlock() { pthread_mutex_unlock(&_mutex); }
pthread_mutex_t* GetRawMutex() { return &_mutex; }
private:
Mutex(const Mutex&) = delete; // 禁止拷贝
Mutex& operator=(const Mutex&) = delete;
pthread_mutex_t _mutex;
};
// RAII 锁,构造加锁,析构解锁
class LockGuard {
public:
LockGuard(Mutex& mutex) : _mutex(mutex) { _mutex.Lock(); }
~LockGuard() { _mutex.Unlock(); }
private:
LockGuard(const LockGuard&) = delete;
LockGuard& operator=(const LockGuard&) = delete;
Mutex& _mutex;
};
}
使用改进:
// test.cpp
#include "Lock.hpp"
#include <stdio.h>
#include <unistd.h>
using namespace LockModule;
int ticket = 100;
Mutex mutex;
void* sell_ticket(void* arg) {
char* id = (char*)arg;
while(1) {
// 先获取数据,尽快释放锁
int current_ticket = 0;
{
LockGuard lock(mutex);
if(ticket <= 0) break;
current_ticket = ticket;
ticket--;
}
// 锁在这里就释放了
// 模拟耗时操作
usleep(1000);
printf("%s sells ticket:%d\n", id, current_ticket);
}
return NULL;
}
int main() {
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, sell_ticket, (void*)"thread 1");
pthread_create(&t2, NULL, sell_ticket, (void*)"thread 2");
pthread_create(&t3, NULL, sell_ticket, (void*)"thread 3");
pthread_create(&t4, NULL, sell_ticket, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
互斥量解决了'同一时间只有一个线程访问资源',但无法解决'线程按顺序访问'的问题。例如上面的售票系统输出顺序我们是无法控制的,此时需要条件变量。
条件变量允许线程在某个条件不满足时主动等待,当条件满足时被其他线程唤醒。它本身不保证互斥,需与互斥量配合使用,核心接口如下:
| 接口 | 功能 | 关键说明 |
|---|---|---|
pthread_cond_init | 动态初始化条件变量 | attr=NULL使用默认属性 |
PTHREAD_COND_INITIALIZER | 静态初始化条件变量 | 全局或静态条件变量使用 |
pthread_cond_wait | 等待条件满足 | 1. 自动释放互斥量;2. 被唤醒后重新获取互斥量 |
pthread_cond_signal | 唤醒一个等待线程 | 唤醒等待队列中的一个线程 |
pthread_cond_broadcast | 唤醒所有等待线程 | 避免线程饥饿 |
pthread_cond_destroy | 销毁条件变量 | 静态初始化的无需销毁 |
为什么 pthread_cond_wait 需要互斥量?
pthread_cond_wait内部自动完成'解锁→等待→唤醒后加锁'的原子流程。等待条件:必须用 while 循环,而非 if,避免伪唤醒(线程被唤醒但条件未满足):
pthread_mutex_lock(&mutex);
while(条件不满足) { // 如队列空、队列满
pthread_cond_wait(&cond, &mutex);
}
// 处理临界资源
pthread_mutex_unlock(&mutex);
发送信号:修改条件后发送信号,且需在互斥量保护内:
pthread_mutex_lock(&mutex);
// 修改条件 // 如向队列添加元素、移除元素
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
// Cond.hpp
#pragma once
#include "Lock.hpp"
#include <pthread.h>
namespace CondModule {
using namespace LockModule;
class Cond {
public:
Cond() { pthread_cond_init(&_cond, NULL); }
~Cond() { pthread_cond_destroy(&_cond); }
void Wait(Mutex& mutex) { // 传入互斥量,解耦
pthread_cond_wait(&_cond, mutex.GetRawMutex());
}
void Signal() { pthread_cond_signal(&_cond); }
void Broadcast() { pthread_cond_broadcast(&_cond); }
private:
Cond(const Cond&) = delete;
Cond& operator=(const Cond&) = delete;
pthread_cond_t _cond;
};
}
// test.cpp
#include "Cond.hpp"
#include <stdio.h>
#include <unistd.h>
using namespace LockModule;
using namespace CondModule;
int ticket = 100;
Mutex mutex;
Cond cond; // 当前应该执行的线程 ID(1,2,3,4)
int current_thread_id = 1;
void* sell_ticket(void* arg) {
int my_id = *(int*)arg;
char id_str[20];
sprintf(id_str, "thread %d", my_id);
while(1) {
{
LockGuard lock(mutex);
// 等待轮到自己执行
while(current_thread_id != my_id && ticket > 0) {
cond.Wait(mutex);
}
// 检查票是否卖完
if(ticket <= 0) {
// 通知下一个线程,让它也退出
current_thread_id = (current_thread_id % 4) + 1;
cond.Broadcast(); // 唤醒所有等待的线程
break;
}
// 卖票
int current_ticket = ticket;
ticket--;
// 更新下一个应该执行的线程
current_thread_id = (current_thread_id % 4) + 1;
// 打印售票信息(在锁内打印确保顺序)
printf("%s sells ticket:%d\n", id_str, current_ticket);
// 通知下一个线程
cond.Broadcast(); // 唤醒所有等待的线程
}
// 在锁外模拟耗时操作
usleep(1000);
}
return NULL;
}
int main() {
pthread_t t1, t2, t3, t4;
int thread_ids[4] = {1, 2, 3, 4};
pthread_create(&t1, NULL, sell_ticket, (void*)&thread_ids[0]);
pthread_create(&t2, NULL, sell_ticket, (void*)&thread_ids[1]);
pthread_create(&t3, NULL, sell_ticket, (void*)&thread_ids[2]);
pthread_create(&t4, NULL, sell_ticket, (void*)&thread_ids[3]);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
这样我们就利用了条件变量实现了售票的同步。
生产者消费者模型是同步互斥的典型应用,核心是'解耦生产者和消费者,平衡处理能力',遵循321 原则:
阻塞队列是动态大小的容器,特点:
基于 std::queue+ 互斥量 + 条件变量实现。
#pragma once
#include <queue>
#include "Lock.hpp"
#include "Cond.hpp"
namespace Model {
using namespace LockModule;
using namespace CondModule;
template<typename T>
class BlockQueue {
public:
BlockQueue(int cap = 10) : _cap(cap), _stop(false) {}
~BlockQueue() {
// 析构时停止队列,唤醒所有阻塞线程
Stop();
}
// 停止队列
void Stop() {
LockGuard lock(_mutex);
_stop = true;
// 广播所有条件变量,唤醒所有阻塞线程
_prod_cond.Broadcast();
_cons_cond.Broadcast();
}
// 生产者入队:返回是否成功(如果队列已停止,返回 false)
bool Enqueue(const T& data) {
LockGuard lock(_mutex);
// 队列满且未停止时,阻塞等待
while(_queue.size() >= _cap && !_stop) {
_prod_cond.Wait(_mutex);
}
// 如果队列已停止,直接返回失败
if(_stop) {
return false;
}
// 生产数据
_queue.push(data);
// 唤醒一个等待的消费者(队列非空了)
_cons_cond.Signal();
return true;
}
// 消费者出队:返回是否成功(如果队列已停止且为空,返回 false)
bool Dequeue(T& data) {
LockGuard lock(_mutex);
// 队列为空且未停止时,阻塞等待
while(_queue.empty() && !_stop) {
_cons_cond.Wait(_mutex);
}
// 如果队列已停止且为空,返回失败
if(_stop && _queue.empty()) {
return false;
}
// 消费数据
data = _queue.front();
_queue.pop();
// 唤醒一个等待的生产者(队列未满了)
_prod_cond.Signal();
return true;
}
// 判断队列是否已停止(可选接口)
bool IsStopped() const {
LockGuard lock(_mutex);
return _stop;
}
size_t Size() {
LockGuard lock(_mutex);
return _queue.size();
}
private:
std::queue<T> _queue; // 任务队列
int _cap; // 队列最大容量
Mutex _mutex; // 保护队列的互斥量
Cond _prod_cond; // 生产者条件变量(队列不满)
Cond _cons_cond; // 消费者条件变量(队列不空)
bool _stop; // 退出标志(是否停止队列)
};
}
#include "BlockQueue.hpp" // 包含修复后的阻塞队列实现
#include <pthread.h>
#include <iostream>
#include <unistd.h>
#include <string>
using namespace Model;
using namespace std;
// 全局阻塞队列(容量设为 3,方便观察阻塞效果)
BlockQueue<string> g_task_queue(3);
// 生产者线程函数:生成 10 个任务,之后退出
void* ProducerThread(void* arg) {
const char* producer_name = static_cast<const char*>(arg);
cout << "[" << producer_name << "] 启动,开始生产任务..." << endl;
for(int i = 0; i < 10; ++i) {
// 生成任务(模拟任务数据)
string task = "任务 -" + to_string(i);
// 入队(队列满时会阻塞)
bool ret = g_task_queue.Enqueue(task);
if(!ret) {
cout << "[" << producer_name << "] 队列已停止,停止生产" << endl;
break;
}
cout << "[" << producer_name << "] 生产任务:" << task << " | 队列当前大小:" << g_task_queue.Size() << endl;
// 模拟生产耗时(1 秒/个)
sleep(1);
}
cout << "[" << producer_name << "] 生产完毕,退出线程" << endl;
return nullptr;
}
// 消费者线程函数:持续消费任务,直到队列停止且为空
void* ConsumerThread(void* arg) {
const char* consumer_name = static_cast<const char*>(arg);
cout << "[" << consumer_name << "] 启动,开始消费任务..." << endl;
while(true) {
string task;
// 出队(队列为空时会阻塞)
bool ret = g_task_queue.Dequeue(task);
if(!ret) {
cout << "[" << consumer_name << "] 队列已停止且无任务,退出消费" << endl;
break;
}
cout << "[" << consumer_name << "] 消费任务:" << task << " | 队列当前大小:" << g_task_queue.Size() << endl;
// 模拟消费耗时(2 秒/个,故意比生产慢,触发队列满阻塞生产者)
sleep(2);
}
cout << "[" << consumer_name << "] 消费完毕,退出线程" << endl;
return nullptr;
}
int main() {
pthread_t producer, consumer;
const char* prod_name = "生产者";
const char* cons_name = "消费者";
// 创建生产者和消费者线程
pthread_create(&producer, nullptr, ProducerThread, (void*)prod_name);
pthread_create(&consumer, nullptr, ConsumerThread, (void*)cons_name);
// 主线程等待 20 秒(确保生产者完成所有任务,消费者消费完毕)
cout << "主线程:等待 20 秒后停止队列..." << endl;
sleep(20);
// 停止队列(优雅唤醒阻塞的线程)
cout << "主线程:停止队列" << endl;
g_task_queue.Stop();
// 等待线程退出(回收资源)
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
cout << "主线程:所有线程退出,程序结束" << endl;
return 0;
}
环形队列是固定大小的数组,用模运算实现'环形',特点:
_empty:表示空闲空间数,初始为队列容量;_full:表示已用空间数,初始为 0;P(_empty)→生产→V(_full);P(_full)→消费→V(_empty);互斥量 _prod_mutex/_cons_mutex:保证多个生产者/消费者的互斥访问。
#pragma once
#include <vector>
#include <pthread.h>
#include <semaphore.h>
namespace Model {
template<typename T>
class RingQueue {
public:
RingQueue(int cap = 10) : _cap(cap), _prod_idx(0), _cons_idx(0) {
_queue.resize(_cap);
// 初始化信号量
sem_init(&_empty, 0, _cap); // 0 表示线程间共享
sem_init(&_full, 0, 0);
// 初始化互斥量
pthread_mutex_init(&_prod_mutex, NULL);
pthread_mutex_init(&_cons_mutex, NULL);
}
~RingQueue() {
sem_destroy(&_empty);
sem_destroy(&_full);
pthread_mutex_destroy(&_prod_mutex);
pthread_mutex_destroy(&_cons_mutex);
}
// 生产者入队
void Enqueue(const T& data) {
sem_wait(&_empty); // P 操作:空闲空间 -1,无空间则阻塞
pthread_mutex_lock(&_prod_mutex); // 多个生产者互斥
// 生产数据
_queue[_prod_idx] = data;
_prod_idx = (_prod_idx + 1) % _cap; // 环形索引
pthread_mutex_unlock(&_prod_mutex);
sem_post(&_full); // V 操作:已用空间 +1,唤醒消费者
}
// 消费者出队
bool Dequeue(T& data) {
sem_wait(&_full); // P 操作:已用空间 -1,无数据则阻塞
pthread_mutex_lock(&_cons_mutex); // 多个消费者互斥
// 消费数据
data = _queue[_cons_idx];
_cons_idx = (_cons_idx + 1) % _cap;
pthread_mutex_unlock(&_cons_mutex);
sem_post(&_empty); // V 操作:空闲空间 +1,唤醒生产者
return true;
}
private:
std::vector<T> _queue; // 环形队列数组
int _cap; // 固定容量
int _prod_idx; // 生产者索引
int _cons_idx; // 消费者索引
sem_t _empty; // 空闲空间信号量
sem_t _full; // 已用空间信号量
pthread_mutex_t _prod_mutex; // 生产者互斥量
pthread_mutex_t _cons_mutex; // 消费者互斥量
};
}
测试代码:
#include <iostream>
#include <unistd.h> // 用于 sleep
#include <cstdlib> // 用于 rand、srand
#include <ctime> // 用于 time(设置随机数种子)
#include "RingQueue.hpp" // 你的环形队列头文件
#include "Lock.hpp" // 你的互斥锁头文件
#include "Cond.hpp" // 你的条件变量头文件
using namespace std;
using namespace Model;
// 测试数据量(可调整)
const int TEST_DATA_COUNT = 20;
// 生产者线程函数:向环形队列中写入数据
void* ProducerThread(void* arg) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
srand(time(nullptr)); // 设置随机数种子(仅生产者设置一次即可)
for(int i = 0; i < TEST_DATA_COUNT; ++i) {
// 生成 1~100 的随机数作为测试数据
int data = rand() % 100 + 1;
// 入队
rq->Enqueue(data);
cout << "[生产者] 写入数据:" << data << endl;
// 模拟生产耗时(100~300 毫秒),让测试更直观
usleep(rand() % 200000 + 100000);
}
cout << "[生产者] 数据写入完成,退出线程" << endl;
pthread_exit(nullptr);
}
// 消费者线程函数:从环形队列中读取数据
void* ConsumerThread(void* arg) {
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
int data = 0;
for(int i = 0; i < TEST_DATA_COUNT; ++i) {
// 出队
bool ret = rq->Dequeue(data);
if(ret) {
cout << "[消费者] 读取数据:" << data << endl;
} else {
cout << "[消费者] 出队失败!" << endl;
}
// 模拟消费耗时(150~400 毫秒),与生产耗时错开,体现同步
usleep(rand() % 250000 + 150000);
}
cout << "[消费者] 数据读取完成,退出线程" << endl;
pthread_exit(nullptr);
}
int main() {
// 创建环形队列(容量设为 5,测试环形特性)
RingQueue<int> ring_queue(5);
pthread_t prod_tid, cons_tid;
int ret = 0;
// 创建生产者线程
ret = pthread_create(&prod_tid, nullptr, ProducerThread, &ring_queue);
if(ret != 0) {
cerr << "创建生产者线程失败!错误码:" << ret << endl;
return -1;
}
// 创建消费者线程
ret = pthread_create(&cons_tid, nullptr, ConsumerThread, &ring_queue);
if(ret != 0) {
cerr << "创建消费者线程失败!错误码:" << ret << endl;
return -1;
}
// 等待两个线程结束
ret = pthread_join(prod_tid, nullptr);
if(ret != 0) {
cerr << "等待生产者线程失败!错误码:" << ret << endl;
return -1;
}
ret = pthread_join(cons_tid, nullptr);
if(ret != 0) {
cerr << "等待消费者线程失败!错误码:" << ret << endl;
return -1;
}
cout << "\n===== 单生产者单消费者测试完成 =====" << endl;
return 0;
}
| 特性 | 阻塞队列(动态) | 环形队列(固定) |
|---|---|---|
| 空间大小 | 动态扩容,无固定限制 | 固定大小,需提前预估 |
| 实现方式 | 互斥量 + 条件变量 | 互斥量 + 信号量 |
| 性能 | 扩容时有开销,中等 | 无扩容开销,高性能 |
| 适用场景 | 数据量不确定,追求灵活性 | 数据量稳定,追求高并发 |
线程安全和可重入是多线程编程中容易混淆的概念,需明确区分:
strtok)。malloc/free(依赖全局堆管理);malloc、printf);char* get_str() { static char buf[1024]; return buf; })。| 维度 | 线程安全 | 可重入 |
|---|---|---|
| 关注对象 | 多线程并发访问的资源一致性 | 函数自身的重入安全性 |
| 依赖条件 | 可能依赖互斥锁 | 不依赖任何共享资源 |
| 关系 | 可重入函数一定是线程安全的 | 线程安全函数不一定是可重入的 |
| 示例 | 加锁的全局变量操作 | 仅使用局部变量的函数 |
死锁是多线程编程的常见问题,指多个线程互相持有对方需要的锁,且均不释放,导致永久阻塞。
// 正确:按顺序加锁
pthread_mutex_lock(&lock1);
pthread_mutex_lock(&lock2);
// 操作资源
pthread_mutex_unlock(&lock2);
pthread_mutex_unlock(&lock1);
pthread_mutex_trylock 尝试加锁,失败则回滚:if(pthread_mutex_lock(&lock1)!=0){
return -1;
}
if(pthread_mutex_trylock(&lock2)!=0){
pthread_mutex_unlock(&lock1); // 回滚已加的锁
return -1;
}
pthread_mutex_timedlock,超时则释放锁并重试;
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online