Linux 线程同步
七月 26, 2025
次阅读
条件变量
条件变量是一种线程同步机制,用于在多线程环境中实现线程间的协调和通信。它允许一个线程在某个条件不满足时进入等待状态,直到另一个线程修改了该条件并通知等待的线程。
关键特点
- 与互斥锁配合使用:条件变量通常与互斥锁(mutex)一起使用,以确保对共享资源的互斥访问。
- 等待与通知机制:
wait:线程在条件不满足时释放互斥锁并进入等待状态。notify:当条件改变时,其他线程通过notify_one或notify_all唤醒等待的线程。
- 避免忙等待:条件变量通过让线程休眠来减少不必要的 CPU 资源消耗。
典型应用场景
- 生产者-消费者问题:当队列为空时,消费者线程等待;生产者线程添加数据后通知消费者。
- 任务调度:线程等待特定条件(如资源可用)后再继续执行。
同步概念与竞态条件
同步
同步是指在多线程环境中,通过某种机制确保线程以预期的顺序访问共享资源,从而避免数据不一致或逻辑错误。同步的核心目标是:
- 数据安全:防止多个线程同时修改共享数据导致冲突。
- 有序访问:控制线程的执行顺序,例如某些操作必须在其他操作完成后才能执行。
- 避免饥饿:确保所有线程都能公平地获得资源。
常见同步工具
- 互斥锁(Mutex)
- 信号量(Semaphore)
- 条件变量(Condition Variable)
竞态条件
竞态条件是指程序的输出或行为依赖于不可控的事件时序,通常因多个线程或进程对共享资源的访问顺序不确定而导致。
条件变量函数
pthread_cond_init 与 pthread_cond_destroy
函数原型
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); // 初始化条件变量
int pthread_cond_destroy(pthread_cond_t *cond); // 销毁条件变量
功能
pthread_cond_init:动态初始化一个条件变量(pthread_cond_t),使其可用于线程同步。pthread_cond_destroy:销毁已初始化的条件变量,释放相关资源。
参数
cond(pthread_cond_t *)- 要初始化或销毁的条件变量指针。
attr(const pthread_condattr_t *,仅init使用)- 条件变量属性,通常设为
NULL表示默认属性。
- 条件变量属性,通常设为
返回值
- 成功:返回
0。 - 失败:返回错误码(如
EINVAL表示无效参数)。
示例
#include <pthread.h>
#include <stdio.h>
int main() {
pthread_cond_t cond;
pthread_condattr_t attr;
// 初始化条件变量属性(可选)
pthread_condattr_init(&attr);
// 动态初始化条件变量
if (pthread_cond_init(&cond, &attr) != 0) {
perror("pthread_cond_init failed");
return 1;
}
// 使用条件变量...(通常配合互斥锁和 pthread_cond_wait/signal)
// 销毁条件变量和属性
pthread_cond_destroy(&cond);
pthread_condattr_destroy(&attr);
// 静态初始化示例(无需 destroy)
pthread_cond_t static_cond = PTHREAD_COND_INITIALIZER;
return 0;
}
关键说明
静态初始化:
- 使用宏
PTHREAD_COND_INITIALIZER直接初始化静态分配的条件变量,无需调用destroy。
pthread_cond_t static_cond = PTHREAD_COND_INITIALIZER;- 使用宏
动态初始化:
- 必须成对调用
pthread_cond_init和pthread_cond_destroy,避免资源泄漏。 - 销毁后条件变量不可再使用,除非重新初始化。
- 必须成对调用
线程安全:
- 初始化后的条件变量可被多线程使用,但需配合互斥锁(如
pthread_mutex_t)实现同步。
- 初始化后的条件变量可被多线程使用,但需配合互斥锁(如
错误处理:
- 检查返回值以确保操作成功,尤其在动态初始化时。
注意事项
pthread_cond_destroy调用时机:- 必须在所有线程结束使用条件变量后调用,否则行为未定义。
- 若条件变量是静态初始化的(
PTHREAD_COND_INITIALIZER),不可调用destroy。
pthread_cond_wait
函数原型
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
功能
使调用线程阻塞等待条件变量 cond 被唤醒,同时释放关联的互斥锁 mutex。当线程被唤醒后,该函数会重新获取 mutex 并返回。
参数
cond(pthread_cond_t *)- 指向已初始化的条件变量的指针。
mutex(pthread_mutex_t *)- 指向已锁定的互斥锁的指针,调用前必须由当前线程锁定。
返回值
- 成功:返回
0。 - 失败:返回错误码(如
EINVAL表示参数无效)。
关键行为
原子操作:
- 释放
mutex:在进入等待状态前,自动释放mutex,允许其他线程操作共享数据。 - 重新获取
mutex:被唤醒后,函数返回前会自动重新锁定mutex。
- 释放
虚假唤醒:
- 即使没有其他线程调用
pthread_cond_signal或pthread_cond_broadcast,线程也可能被唤醒。因此,条件判断必须使用while循环而非if。 - 在调用
pthread_cond_broadcast的情况下,所有等待线程都会被唤醒,其中一些线程可能在条件未满足时继续执行,导致竞态条件。
- 即使没有其他线程调用
典型使用模式:
pthread_mutex_lock(&mutex); while (condition_is_false) { // 必须用 while 检查条件 pthread_cond_wait(&cond, &mutex); } // 操作共享数据... pthread_mutex_unlock(&mutex);
示例
#include <pthread.h>
#include <stdio.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int shared_data = 0;
void* consumer(void* arg) {
pthread_mutex_lock(&mutex);
while (shared_data == 0) { // 必须循环检查条件
printf("Consumer: Waiting for data...\n");
pthread_cond_wait(&cond, &mutex);
}
printf("Consumer: Received data %d\n", shared_data);
pthread_mutex_unlock(&mutex);
return NULL;
}
void* producer(void* arg) {
pthread_mutex_lock(&mutex);
shared_data = 42;
printf("Producer: Sending data...\n");
pthread_cond_signal(&cond); // 唤醒等待的线程
pthread_mutex_unlock(&mutex);
return NULL;
}
int main() {
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, consumer, NULL);
pthread_create(&tid2, NULL, producer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
return 0;
}
输出示例
Consumer: Waiting for data...
Producer: Sending data...
Consumer: Received data 42
注意事项
必须持有
mutex:- 调用
pthread_cond_wait前,线程必须已锁定mutex,否则行为未定义。
- 调用
条件检查:
- 必须使用
while循环检查条件,防止虚假唤醒导致逻辑错误。
- 必须使用
性能优化:
- 在复杂场景中,可结合
pthread_cond_broadcast(唤醒所有等待线程)和pthread_cond_signal(唤醒一个线程)选择唤醒策略。
- 在复杂场景中,可结合
pthread_cond_signal 和 pthread_cond_broadcast
函数原型
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒至少一个等待线程
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有等待线程
功能
这两个函数用于唤醒正在等待条件变量 cond 的线程:
pthread_cond_signal:唤醒至少一个等待该条件变量的线程(具体唤醒哪个由系统调度决定)pthread_cond_broadcast:唤醒所有等待该条件变量的线程
参数
cond(pthread_cond_t *):指向已初始化的条件变量的指针
返回值
- 成功返回 0
- 失败返回错误码(如 EINVAL 表示参数无效)
关键区别
| 特性 | pthread_cond_signal | pthread_cond_broadcast |
|---|---|---|
| 唤醒线程数量 | 至少一个 | 所有 |
| 性能影响 | 较低 | 较高 |
| 适用场景 | 单个资源可用 | 多个资源可用/状态改变 |
典型使用模式
pthread_mutex_lock(&mutex);
// 修改共享条件
shared_condition = 1;
// 唤醒等待者
pthread_cond_signal(&cond); // 或 broadcast
pthread_mutex_unlock(&mutex);
示例
#include <pthread.h>
#include <stdio.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int data_ready = 0;
void* worker(void* id) {
pthread_mutex_lock(&mutex);
while (!data_ready) {
pthread_cond_wait(&cond, &mutex);
}
printf("Thread %ld: got data\n", (long)id);
pthread_mutex_unlock(&mutex);
return NULL;
}
int main() {
pthread_t threads[3];
// 创建3个工作线程
for (long i = 0; i < 3; i++) {
pthread_create(&threads[i], NULL, worker, (void*)i);
}
sleep(1); // 确保所有线程都进入等待
pthread_mutex_lock(&mutex);
data_ready = 1;
// pthread_cond_signal(&cond); // 只会唤醒一个线程
pthread_cond_broadcast(&cond); // 会唤醒所有线程
pthread_mutex_unlock(&mutex);
for (int i = 0; i < 3; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
注意事项
调用时机:
- 通常在修改完条件变量关联的共享数据后调用
- 可以在持有或不持有互斥锁时调用,但通常建议在持有锁时调用以避免竞态条件
性能考虑:
signal性能更高,适合只需唤醒一个线程的情况broadcast会产生”惊群效应”,适合需要通知所有等待线程的场景
调用位置:
- 可以在临界区内或外调用,但通常放在临界区内更安全
选择建议
- 当只有一个线程能处理当前条件变化时,使用
signal - 当条件变化允许/需要多个线程响应时,使用
broadcast - 当不确定时,使用
broadcast更安全但性能较低
常见使用场景
signal:生产者-消费者模型(单个资源)broadcast:读写锁实现、屏障同步、多个等待者需要响应状态变化
生产者-消费者模型实现
下面我将完整展示一个基于 POSIX 线程的生产者-消费者模型实现,包含任务定义、阻塞队列和主程序三个部分。
任务定义实现 (Task.hpp)
#ifndef _TASK_HPP_
#define _TASK_HPP_ 1
#include <iostream>
// 计算错误码枚举
enum calc_error {
division_by_zero_error = 1,
operator_error,
};
class Task {
public:
// 构造函数初始化运算数和操作符
Task(int num1, int num2, char op)
: num1_(num1), num2_(num2), op_(op),
result_(0), exitcode_(0) {}
// 执行计算任务
void Run() {
exitcode_ = 0;
switch (op_) {
case '+':
result_ = num1_ + num2_;
break;
case '-':
result_ = num1_ - num2_;
break;
case '*':
result_ = num1_ * num2_;
break;
case '/':
if(num2_ == 0) {
result_ = 0;
exitcode_ = division_by_zero_error;
} else {
result_ = num1_ / num2_;
}
break;
case '%':
if(num2_ == 0){
result_ = 0;
exitcode_ = calc_error::division_by_zero_error;
}
else result_ = num1_ % num2_;
break;
default:
result_ = 0;
exitcode_ = operator_error;
break;
}
}
// 重载函数调用运算符
void operator()() {
Run();
}
// 获取运算结果字符串
std::string GetResult() {
std::string ret = std::to_string(num1_);
ret += op_;
ret += std::to_string(num2_);
ret += "=";
ret += std::to_string(result_);
ret += "[code: ";
ret += std::to_string(exitcode_);
ret += "]";
return ret;
}
// 获取任务描述字符串
std::string GetTask() {
std::string ret = std::to_string(num1_);
ret += op_;
ret += std::to_string(num2_);
ret += "=?";
return ret;
}
private:
int num1_, num2_; // 运算数
char op_; // 操作符
int result_; // 计算结果
int exitcode_; // 错误码
};
#endif
阻塞队列实现 (block_queue.hpp)
#ifndef _BLOCK_QUEUE_CPP_
#define _BLOCK_QUEUE_CPP_ 1
#include <iostream>
#include <queue>
#include <pthread.h>
/**
* @brief 线程安全的阻塞队列模板类
* @tparam T 队列元素类型
*/
template<class T>
class BlockQueue {
public:
/**
* @brief 构造函数
* @param cap 队列容量,默认为10
*/
explicit BlockQueue(u_int cap = 10) : cap_(cap) {
// 初始化互斥锁和条件变量
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&c_cond_, nullptr); // 消费者条件变量
pthread_cond_init(&p_cond_, nullptr); // 生产者条件变量
}
/**
* @brief 析构函数
*/
~BlockQueue() {
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
/**
* @brief 向队列中添加元素
* @param task 要添加的元素
*/
void Push(const T& task) {
pthread_mutex_lock(&mtx_); // 加锁
// 如果队列已满,生产者等待
while(task_queue.size() >= cap_) {
pthread_cond_wait(&p_cond_, &mtx_);
}
task_queue.push(task); // 添加元素到队列
// 通知消费者有数据可取
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mtx_); // 解锁
}
/**
* @brief 从队列中取出元素
* @return 队列首元素
*/
T Pop() {
pthread_mutex_lock(&mtx_); // 加锁
// 如果队列为空,消费者等待
while(task_queue.empty()) {
pthread_cond_wait(&c_cond_, &mtx_);
}
T task = task_queue.front(); // 获取队列首元素
task_queue.pop(); // 移除队列首元素
// 通知生产者有空位可生产
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mtx_); // 解锁
return task;
}
private:
std::queue<T> task_queue; // 底层队列容器
pthread_mutex_t mtx_; // 互斥锁,保护队列操作
pthread_cond_t c_cond_; // 消费者条件变量
pthread_cond_t p_cond_; // 生产者条件变量
u_int cap_; // 队列容量
};
#endif
主程序实现 (cp.cc)
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include "Task.hpp"
#include "block_queue.hpp"
// 全局输出互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
const int consumer_num = 3;
const int producer_num = 5;
char opers[] = {'+', '-', '*', '/', '%'};
// 线程参数结构
struct thread_data {
std::string name;
BlockQueue<Task>* bq;
};
// 生产者线程函数
void* Producer(void* args) {
auto* td = static_cast<thread_data*>(args);
while(true) {
usleep(10000); // 10ms延迟
// 生成随机任务
int num1 = rand() % 10;
int num2 = rand() % 10;
char op = opers[rand() % 5];
Task task{num1, num2, op};
// 同步输出
pthread_mutex_lock(&mutex);
std::cout << td->name << " 生产: " << task.GetTask() << std::endl;
pthread_mutex_unlock(&mutex);
td->bq->Push(task); // 放入队列
sleep(1); // 控制生产速度
}
return nullptr;
}
// 消费者线程函数
void* Consumer(void* args) {
auto* td = static_cast<thread_data*>(args);
while(true) {
usleep(10000); // 10ms延迟
Task task = td->bq->Pop(); // 获取任务
task(); // 执行计算
// 同步输出结果
pthread_mutex_lock(&mutex);
std::cout << td->name << " 消费: " << task.GetResult() << std::endl;
pthread_mutex_unlock(&mutex);
}
return nullptr;
}
int main() {
srand(time(nullptr));
BlockQueue<Task> bq;
std::vector<pthread_t> tids;
// 创建生产者线程
for(int i = 1; i <= producer_num; ++i) {
auto* td = new thread_data{
"生产者-" + std::to_string(i),
&bq
};
pthread_t tid;
pthread_create(&tid, nullptr, Producer, td);
tids.push_back(tid);
}
// 创建消费者线程
for(int i = 1; i <= consumer_num; ++i) {
auto* td = new thread_data{
"消费者-" + std::to_string(i),
&bq
};
pthread_t tid;
pthread_create(&tid, nullptr, Consumer, td);
tids.push_back(tid);
}
// 等待线程结束
for(auto tid : tids) {
pthread_join(tid, nullptr);
}
return 0;
}
程序运行结果示例
生产者-1 生产: 3+7=?
生产者-2 生产: 8-2=?
消费者-1 消费: 3+7=10[code: 0]
消费者-2 消费: 8-2=6[code: 0]
生产者-3 生产: 5*4=?
消费者-3 消费: 5*4=20[code: 0]
生产者-4 生产: 9/3=?
生产者-5 生产: 7%4=?
消费者-1 消费: 9/3=3[code: 0]
消费者-2 消费: 7%4=3[code: 0]
生产者-1 生产: 2/0=?
消费者-3 消费: 2/0=0[code: 1]
生产者-2 生产: 6*3=?
消费者-1 消费: 6*3=18[code: 0]
实现特点分析
线程安全设计:
- 使用互斥锁保护共享队列和标准输出
- 条件变量实现精确的线程唤醒机制
流量控制机制:
- 当队列满时阻塞生产者线程
- 当队列空时阻塞消费者线程
任务处理流程:
- 生产者生成随机算术运算任务
- 消费者执行运算并输出完整结果
- 包含完善的错误处理机制
资源管理:
- 使用 RAII 风格管理互斥锁和条件变量
- 通过对象生命周期管理线程参数
查看评论