Linux 线程同步

Linux 线程同步

七月 26, 2025 次阅读

条件变量

条件变量是一种线程同步机制,用于在多线程环境中实现线程间的协调和通信。它允许一个线程在某个条件不满足时进入等待状态,直到另一个线程修改了该条件并通知等待的线程。

关键特点

  1. 与互斥锁配合使用:条件变量通常与互斥锁(mutex)一起使用,以确保对共享资源的互斥访问。
  2. 等待与通知机制
    • wait:线程在条件不满足时释放互斥锁并进入等待状态。
    • notify:当条件改变时,其他线程通过notify_onenotify_all唤醒等待的线程。
  3. 避免忙等待:条件变量通过让线程休眠来减少不必要的 CPU 资源消耗。

典型应用场景

  • 生产者-消费者问题:当队列为空时,消费者线程等待;生产者线程添加数据后通知消费者。
  • 任务调度:线程等待特定条件(如资源可用)后再继续执行。

同步概念与竞态条件

同步

同步是指在多线程环境中,通过某种机制确保线程以预期的顺序访问共享资源,从而避免数据不一致或逻辑错误。同步的核心目标是:

  1. 数据安全:防止多个线程同时修改共享数据导致冲突。
  2. 有序访问:控制线程的执行顺序,例如某些操作必须在其他操作完成后才能执行。
  3. 避免饥饿:确保所有线程都能公平地获得资源。

常见同步工具

  • 互斥锁(Mutex)
  • 信号量(Semaphore)
  • 条件变量(Condition Variable)

竞态条件

竞态条件是指程序的输出或行为依赖于不可控的事件时序,通常因多个线程或进程对共享资源的访问顺序不确定而导致。

条件变量函数

pthread_cond_initpthread_cond_destroy

函数原型

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);  // 初始化条件变量
int pthread_cond_destroy(pthread_cond_t *cond);                              // 销毁条件变量

功能

  • pthread_cond_init:动态初始化一个条件变量(pthread_cond_t),使其可用于线程同步。
  • pthread_cond_destroy:销毁已初始化的条件变量,释放相关资源。

参数

  1. condpthread_cond_t *
    • 要初始化或销毁的条件变量指针。
  2. attrconst pthread_condattr_t *,仅 init 使用)
    • 条件变量属性,通常设为 NULL 表示默认属性。

返回值

  • 成功:返回 0
  • 失败:返回错误码(如 EINVAL 表示无效参数)。

示例

#include <pthread.h>
#include <stdio.h>

int main() {
    pthread_cond_t cond;
    pthread_condattr_t attr;

    // 初始化条件变量属性(可选)
    pthread_condattr_init(&attr);

    // 动态初始化条件变量
    if (pthread_cond_init(&cond, &attr) != 0) {
        perror("pthread_cond_init failed");
        return 1;
    }

    // 使用条件变量...(通常配合互斥锁和 pthread_cond_wait/signal)

    // 销毁条件变量和属性
    pthread_cond_destroy(&cond);
    pthread_condattr_destroy(&attr);

    // 静态初始化示例(无需 destroy)
    pthread_cond_t static_cond = PTHREAD_COND_INITIALIZER;
    return 0;
}

关键说明

  1. 静态初始化

    • 使用宏 PTHREAD_COND_INITIALIZER 直接初始化静态分配的条件变量,无需调用 destroy
    pthread_cond_t static_cond = PTHREAD_COND_INITIALIZER;
  2. 动态初始化

    • 必须成对调用 pthread_cond_initpthread_cond_destroy,避免资源泄漏。
    • 销毁后条件变量不可再使用,除非重新初始化。
  3. 线程安全

    • 初始化后的条件变量可被多线程使用,但需配合互斥锁(如 pthread_mutex_t)实现同步。
  4. 错误处理

    • 检查返回值以确保操作成功,尤其在动态初始化时。

注意事项

  • pthread_cond_destroy 调用时机
    • 必须在所有线程结束使用条件变量后调用,否则行为未定义。
    • 若条件变量是静态初始化的(PTHREAD_COND_INITIALIZER),不可调用 destroy

pthread_cond_wait

函数原型

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

功能

使调用线程阻塞等待条件变量 cond 被唤醒,同时释放关联的互斥锁 mutex。当线程被唤醒后,该函数会重新获取 mutex 并返回。

参数

  1. condpthread_cond_t *
    • 指向已初始化的条件变量的指针。
  2. mutexpthread_mutex_t *
    • 指向已锁定的互斥锁的指针,调用前必须由当前线程锁定。

返回值

  • 成功:返回 0
  • 失败:返回错误码(如 EINVAL 表示参数无效)。

关键行为

  1. 原子操作

    • 释放 mutex:在进入等待状态前,自动释放 mutex,允许其他线程操作共享数据。
    • 重新获取 mutex:被唤醒后,函数返回前会自动重新锁定 mutex
  2. 虚假唤醒

    • 即使没有其他线程调用 pthread_cond_signalpthread_cond_broadcast,线程也可能被唤醒。因此,条件判断必须使用 while 循环而非 if
    • 在调用 pthread_cond_broadcast 的情况下,所有等待线程都会被唤醒,其中一些线程可能在条件未满足时继续执行,导致竞态条件。
  3. 典型使用模式

    pthread_mutex_lock(&mutex);
    while (condition_is_false) {  // 必须用 while 检查条件
        pthread_cond_wait(&cond, &mutex);
    }
    // 操作共享数据...
    pthread_mutex_unlock(&mutex);

示例

#include <pthread.h>
#include <stdio.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int shared_data = 0;

void* consumer(void* arg) {
    pthread_mutex_lock(&mutex);
    while (shared_data == 0) {  // 必须循环检查条件
        printf("Consumer: Waiting for data...\n");
        pthread_cond_wait(&cond, &mutex);
    }
    printf("Consumer: Received data %d\n", shared_data);
    pthread_mutex_unlock(&mutex);
    return NULL;
}

void* producer(void* arg) {
    pthread_mutex_lock(&mutex);
    shared_data = 42;
    printf("Producer: Sending data...\n");
    pthread_cond_signal(&cond);  // 唤醒等待的线程
    pthread_mutex_unlock(&mutex);
    return NULL;
}

int main() {
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, consumer, NULL);
    pthread_create(&tid2, NULL, producer, NULL);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    return 0;
}

输出示例

Consumer: Waiting for data...
Producer: Sending data...
Consumer: Received data 42

注意事项

  1. 必须持有 mutex

    • 调用 pthread_cond_wait 前,线程必须已锁定 mutex,否则行为未定义。
  2. 条件检查

    • 必须使用 while 循环检查条件,防止虚假唤醒导致逻辑错误。
  3. 性能优化

    • 在复杂场景中,可结合 pthread_cond_broadcast(唤醒所有等待线程)和 pthread_cond_signal(唤醒一个线程)选择唤醒策略。

pthread_cond_signalpthread_cond_broadcast

函数原型

int pthread_cond_signal(pthread_cond_t *cond);    // 唤醒至少一个等待线程
int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有等待线程

功能

这两个函数用于唤醒正在等待条件变量 cond 的线程:

  • pthread_cond_signal:唤醒至少一个等待该条件变量的线程(具体唤醒哪个由系统调度决定)
  • pthread_cond_broadcast:唤醒所有等待该条件变量的线程

参数

  • condpthread_cond_t *):指向已初始化的条件变量的指针

返回值

  • 成功返回 0
  • 失败返回错误码(如 EINVAL 表示参数无效)

关键区别

特性 pthread_cond_signal pthread_cond_broadcast
唤醒线程数量 至少一个 所有
性能影响 较低 较高
适用场景 单个资源可用 多个资源可用/状态改变

典型使用模式

pthread_mutex_lock(&mutex);
// 修改共享条件
shared_condition = 1;
// 唤醒等待者
pthread_cond_signal(&cond);  // 或 broadcast
pthread_mutex_unlock(&mutex);

示例

#include <pthread.h>
#include <stdio.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int data_ready = 0;

void* worker(void* id) {
    pthread_mutex_lock(&mutex);
    while (!data_ready) {
        pthread_cond_wait(&cond, &mutex);
    }
    printf("Thread %ld: got data\n", (long)id);
    pthread_mutex_unlock(&mutex);
    return NULL;
}

int main() {
    pthread_t threads[3];

    // 创建3个工作线程
    for (long i = 0; i < 3; i++) {
        pthread_create(&threads[i], NULL, worker, (void*)i);
    }

    sleep(1);  // 确保所有线程都进入等待

    pthread_mutex_lock(&mutex);
    data_ready = 1;
    // pthread_cond_signal(&cond);  // 只会唤醒一个线程
    pthread_cond_broadcast(&cond);  // 会唤醒所有线程
    pthread_mutex_unlock(&mutex);

    for (int i = 0; i < 3; i++) {
        pthread_join(threads[i], NULL);
    }

    return 0;
}

注意事项

  1. 调用时机

    • 通常在修改完条件变量关联的共享数据后调用
    • 可以在持有或不持有互斥锁时调用,但通常建议在持有锁时调用以避免竞态条件
  2. 性能考虑

    • signal 性能更高,适合只需唤醒一个线程的情况
    • broadcast 会产生”惊群效应”,适合需要通知所有等待线程的场景
  3. 调用位置

    • 可以在临界区内或外调用,但通常放在临界区内更安全

选择建议

  • 当只有一个线程能处理当前条件变化时,使用signal
  • 当条件变化允许/需要多个线程响应时,使用broadcast
  • 当不确定时,使用broadcast更安全但性能较低

常见使用场景

  • signal:生产者-消费者模型(单个资源)
  • broadcast:读写锁实现、屏障同步、多个等待者需要响应状态变化

生产者-消费者模型实现

下面我将完整展示一个基于 POSIX 线程的生产者-消费者模型实现,包含任务定义、阻塞队列和主程序三个部分。

任务定义实现 (Task.hpp)

#ifndef _TASK_HPP_
#define _TASK_HPP_ 1

#include <iostream>

// 计算错误码枚举
enum calc_error {
    division_by_zero_error = 1,
    operator_error,
};

class Task {
public:
    // 构造函数初始化运算数和操作符
    Task(int num1, int num2, char op)
        : num1_(num1), num2_(num2), op_(op),
          result_(0), exitcode_(0) {}

    // 执行计算任务
    void Run() {
        exitcode_ = 0;
        switch (op_) {
            case '+':
                result_ = num1_ + num2_;
                break;
            case '-':
                result_ = num1_ - num2_;
                break;
            case '*':
                result_ = num1_ * num2_;
                break;
            case '/':
                if(num2_ == 0) {
                    result_ = 0;
                    exitcode_ = division_by_zero_error;
                } else {
                    result_ = num1_ / num2_;
                }
                break;
            case '%':
              if(num2_ == 0){
                  result_ = 0;
                  exitcode_ = calc_error::division_by_zero_error;
              }
              else result_ = num1_ % num2_;
            break;
            default:
                result_ = 0;
                exitcode_ = operator_error;
                break;
        }
    }

    // 重载函数调用运算符
    void operator()() {
        Run();
    }

    // 获取运算结果字符串
    std::string GetResult() {
        std::string ret = std::to_string(num1_);
        ret += op_;
        ret += std::to_string(num2_);
        ret += "=";
        ret += std::to_string(result_);
        ret += "[code: ";
        ret += std::to_string(exitcode_);
        ret += "]";
        return ret;
    }

    // 获取任务描述字符串
    std::string GetTask() {
        std::string ret = std::to_string(num1_);
        ret += op_;
        ret += std::to_string(num2_);
        ret += "=?";
        return ret;
    }

private:
    int num1_, num2_;    // 运算数
    char op_;            // 操作符
    int result_;         // 计算结果
    int exitcode_;       // 错误码
};

#endif

阻塞队列实现 (block_queue.hpp)

#ifndef _BLOCK_QUEUE_CPP_
#define _BLOCK_QUEUE_CPP_ 1

#include <iostream>
#include <queue>
#include <pthread.h>

/**
 * @brief 线程安全的阻塞队列模板类
 * @tparam T 队列元素类型
 */
template<class T>
class BlockQueue {
public:
    /**
     * @brief 构造函数
     * @param cap 队列容量,默认为10
     */
    explicit BlockQueue(u_int cap = 10) : cap_(cap) {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);  // 消费者条件变量
        pthread_cond_init(&p_cond_, nullptr);  // 生产者条件变量
    }

    /**
     * @brief 析构函数
     */
    ~BlockQueue() {
        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

    /**
     * @brief 向队列中添加元素
     * @param task 要添加的元素
     */
    void Push(const T& task) {
        pthread_mutex_lock(&mtx_);  // 加锁

        // 如果队列已满,生产者等待
        while(task_queue.size() >= cap_) {
            pthread_cond_wait(&p_cond_, &mtx_);
        }

        task_queue.push(task);  // 添加元素到队列

        // 通知消费者有数据可取
        pthread_cond_signal(&c_cond_);

        pthread_mutex_unlock(&mtx_);  // 解锁
    }

    /**
     * @brief 从队列中取出元素
     * @return 队列首元素
     */
    T Pop() {
        pthread_mutex_lock(&mtx_);  // 加锁

        // 如果队列为空,消费者等待
        while(task_queue.empty()) {
            pthread_cond_wait(&c_cond_, &mtx_);
        }

        T task = task_queue.front();  // 获取队列首元素
        task_queue.pop();            // 移除队列首元素

        // 通知生产者有空位可生产
        pthread_cond_signal(&p_cond_);

        pthread_mutex_unlock(&mtx_);  // 解锁

        return task;
    }

private:
    std::queue<T> task_queue;  // 底层队列容器
    pthread_mutex_t mtx_;      // 互斥锁,保护队列操作
    pthread_cond_t c_cond_;    // 消费者条件变量
    pthread_cond_t p_cond_;    // 生产者条件变量
    u_int cap_;               // 队列容量
};

#endif

主程序实现 (cp.cc)

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include "Task.hpp"
#include "block_queue.hpp"

// 全局输出互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

const int consumer_num = 3;
const int producer_num = 5;
char opers[] = {'+', '-', '*', '/', '%'};

// 线程参数结构
struct thread_data {
    std::string name;
    BlockQueue<Task>* bq;
};

// 生产者线程函数
void* Producer(void* args) {
    auto* td = static_cast<thread_data*>(args);
    while(true) {
        usleep(10000);  // 10ms延迟

        // 生成随机任务
        int num1 = rand() % 10;
        int num2 = rand() % 10;
        char op = opers[rand() % 5];
        Task task{num1, num2, op};

        // 同步输出
        pthread_mutex_lock(&mutex);
        std::cout << td->name << " 生产: " << task.GetTask() << std::endl;
        pthread_mutex_unlock(&mutex);

        td->bq->Push(task);  // 放入队列
        sleep(1);  // 控制生产速度
    }
    return nullptr;
}

// 消费者线程函数
void* Consumer(void* args) {
    auto* td = static_cast<thread_data*>(args);
    while(true) {
        usleep(10000);  // 10ms延迟

        Task task = td->bq->Pop();  // 获取任务
        task();  // 执行计算

        // 同步输出结果
        pthread_mutex_lock(&mutex);
        std::cout << td->name << " 消费: " << task.GetResult() << std::endl;
        pthread_mutex_unlock(&mutex);
    }
    return nullptr;
}

int main() {
    srand(time(nullptr));
    BlockQueue<Task> bq;
    std::vector<pthread_t> tids;

    // 创建生产者线程
    for(int i = 1; i <= producer_num; ++i) {
        auto* td = new thread_data{
            "生产者-" + std::to_string(i),
            &bq
        };
        pthread_t tid;
        pthread_create(&tid, nullptr, Producer, td);
        tids.push_back(tid);
    }

    // 创建消费者线程
    for(int i = 1; i <= consumer_num; ++i) {
        auto* td = new thread_data{
            "消费者-" + std::to_string(i),
            &bq
        };
        pthread_t tid;
        pthread_create(&tid, nullptr, Consumer, td);
        tids.push_back(tid);
    }

    // 等待线程结束
    for(auto tid : tids) {
        pthread_join(tid, nullptr);
    }

    return 0;
}

程序运行结果示例

生产者-1 生产: 3+7=?
生产者-2 生产: 8-2=?
消费者-1 消费: 3+7=10[code: 0]
消费者-2 消费: 8-2=6[code: 0]
生产者-3 生产: 5*4=?
消费者-3 消费: 5*4=20[code: 0]
生产者-4 生产: 9/3=?
生产者-5 生产: 7%4=?
消费者-1 消费: 9/3=3[code: 0]
消费者-2 消费: 7%4=3[code: 0]
生产者-1 生产: 2/0=?
消费者-3 消费: 2/0=0[code: 1]
生产者-2 生产: 6*3=?
消费者-1 消费: 6*3=18[code: 0]

实现特点分析

  1. 线程安全设计

    • 使用互斥锁保护共享队列和标准输出
    • 条件变量实现精确的线程唤醒机制
  2. 流量控制机制

    • 当队列满时阻塞生产者线程
    • 当队列空时阻塞消费者线程
  3. 任务处理流程

    • 生产者生成随机算术运算任务
    • 消费者执行运算并输出完整结果
    • 包含完善的错误处理机制
  4. 资源管理

    • 使用 RAII 风格管理互斥锁和条件变量
    • 通过对象生命周期管理线程参数