C++ 异步编程

C++ 异步编程

九月 30, 2025 次阅读

C++ 11 引入了对异步编程的支持,主要通过 std::asyncstd::futurestd::promise 等标准库组件实现。这些工具使得在 C++ 中编写异步代码变得更加简单和直观。

所谓的异步编程,指的是程序在执行某些操作时,不需要等待这些操作完成,而是可以继续执行其他任务。当这些异步操作完成后,程序可以通过回调函数、事件或其他机制来处理结果,就好比一个程序员在编译运行一个较长的任务时,可以同时处理其他任务,任务运行完后会告诉他结果。

一般来说,我们会直接从最基础的 std::async 开始讲起,因为它是最简单的异步编程方式,但它底层本质上就是对 std::futurestd::promise 的封装,我们先将更接近底层的 std::futurestd::promise 讲解完,这样更方便我们理解 C++ 异步编程的原理,不然一开始讲解更高级的接口封装难免会让大家心里一直有疑惑。

std::future 和 std::promise

std::futurestd::promise 是 C++11 引入的两个标准库组件,用于实现异步编程和线程间通信。它们通常一起使用,std::promise 用于在一个线程中设置一个值,而 std::future 用于在另一个线程中获取这个值。

std::future 就是一个消费者,负责获取值,而 std::promise 就是一个生产者,负责设置值。它们之间通过共享状态进行通信。因此, std::promisestd::future 之间通信的关键就在于这个共享状态,不过我们先不急着讲解它,我们先来了解一下 std::promisestd::future 的基本用法。

基本用法

std::promise 是一个模板类,用于在一个线程中设置一个值,这个值可以是任何类型。它提供了一个 set_value 方法,用于设置值,以及一个 set_exception 方法,用于设置异常。
std::future 也是一个模板类,用于在另一个线程中获取这个值。它提供了一个 get 方法,用于获取值,如果值还没有被设置,get 方法会阻塞直到值被设置。

std::promise 设置了一个值后,任何等待这个值的 std::future 都会被通知,并且可以获取这个值。因此,在 std::promise 设置值之前,std::future 处于等待状态,也就是说,std::future 只能在 std::promise 设置值之后才能获取到这个值,否则会被阻塞。

下面是一个简单的例子,演示了如何使用 std::promise 来设置一个值以及如何使用 std::future 来获取这个值:

void basic_example() {
  // 创建 promise-future 对
  std::promise<int> prom;
  std::future<int> fut = prom.get_future();

  // 启动一个线程来设置 promise 的值
  std::thread producer([&prom]() {
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟一些工作
    prom.set_value(42); // 设置 promise 的值
  });

  // 主线程等待 future 的值
  std::cout << "Waiting for the value..." << std::endl;
    int value;
    {
        Timer<std::chrono::seconds> timer; // 计时开始
      value = fut.get(); // 阻塞直到 promise 设置了值
    }
  std::cout << "Received value: " << value << std::endl;

  producer.join(); // 等待生产者线程结束
}

在这个例子中,我们创建了一个 std::promise<int> 对象 prom,并通过 prom.get_future() 获取了一个与之关联的 std::future<int> 对象 fut。然后,我们启动了一个线程,在这个线程中模拟了一些工作(通过 sleep_for),然后调用 prom.set_value(42) 来设置值。
在主线程中,我们调用 fut.get() 来等待并获取这个值。由于 fut.get() 会阻塞直到 prom 设置了值,所以主线程会等待生产者线程完成工作并设置值后,才会继续执行并打印出接收到的值。

代码输出如下:

Waiting for the value...
Elapsed time: 2 s
Received value: 42

正如上面所说,std::promise 不仅可以设置值,还可以设置异常。我们可以使用 set_exception 方法来设置一个异常,这样当 std::future 调用 get 方法时,就会抛出这个异常。下面是一个例子:

void exception_example() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread worker([&prom]() {
        try {
            // 模拟一些可能抛出异常的工作
            throw std::runtime_error("Something went wrong!");
            prom.set_value(100);  // 这行不会执行
        } catch (...) {
            // 捕获异常并传递给 future
            prom.set_exception(std::current_exception());
        }
    });

    try {
        int result = fut.get();
        std::cout << "Result: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }

    worker.join();
}

在这个例子中,工作线程模拟了一些可能抛出异常的工作,并在捕获到异常后调用 prom.set_exception(std::current_exception()) 将异常传递给 std::future。当主线程调用 fut.get() 时,会抛出这个异常,我们在主线程中捕获并处理它。代码输出如下:

Caught exception: Something went wrong!

std::future 还有一些其他有用的方法,比如 waitwait_for,它们可以用来等待值的设置而不阻塞线程。wait 方法会一直等待直到值被设置,而 wait_for 方法允许我们指定一个超时时间,如果在这个时间内值没有被设置,它会返回一个状态,表示等待超时。比较简单,我们这里就不展开讲解了。

std::future 中有一个方法叫做 valid(),它用于检查 std::future 对象是否与一个有效的共享状态关联。也就是说,如果 std::future 对象是通过 std::promise 创建的,并且还没有被移动或者已经获取了值,那么 valid() 会返回 true。否则,它会返回 false。我们会在共享状态中提及它,届时我们会对这个接口有更深入的理解。

共享状态(shared state)

那么,std::promisestd::future 之间是如何通信的呢?它们之间通过一个共享状态进行通信。这个共享状态包含了 std::promise 设置的值或者异常,以及一个标志,表示值是否已经被设置。 当 std::promise 调用 set_valueset_exception 时,它会更新这个共享状态,并通知所有等待这个值的 std::future。当 std::future 调用 get 方法时,它会检查这个共享状态,如果值已经被设置,它就会返回这个值或者抛出异常;如果值还没有被设置,它就会阻塞,直到值被设置。 这个共享状态是由 C++ 标准库内部管理的,用户不需要直接操作它。 需要注意的是,std::promisestd::future 之间的通信是单向的,也就是说,一个 std::promise 只能与一个 std::future 关联,反之亦然。如果需要多个线程等待同一个值,可以使用 std::shared_future,它允许多个 std::future 实例共享同一个值。

概览

  • shared statestd::promise<T>std::future<T> 之间真正交换值/异常、等待/通知的那块内部数据结构。它通常是一个控制块(control block),由实现以堆上对象的形式保存,供 promise/future/ shared_future 持有引用。
  • promisefuture(或 shared_future持有或引用这块 shared state;valid() 就是用来判断对象当前是否仍然关联到这块 shared state(即“是否有有效的 shared state”)。

典型的 shared_state 内容

以下代码是一段伪代码,展示了一个典型的 shared state 可能包含的内容(实际实现会更复杂且依赖于具体标准库实现):

struct shared_state {
    std::mutex m;
    std::condition_variable cv;   // wait/notify
    bool ready = false;           // 是否已就绪(set_value / set_exception)
    // storage for value T (placement-new) or specialization for void / reference
    std::aligned_storage_t<sizeof(T), alignof(T)> storage;
    std::exception_ptr ex;        // 如果是异常,就放这里
    std::atomic<int> ref_count;   // 引用计数(promise/future/shared_future)
    bool future_retrieved = false; // 是否已经从 promise 调用了 get_future(实现可选的标志)
    // 可能还有:等待线程计数、协程/continuation 列表(实现细节随库而异)
};

生命周期 / 状态迁移(常见操作影响)

  • 创建

    • std::promise<T> p;:通常会创建一个新的 shared state -> p.valid() == true
  • 取 future

    • auto f = p.get_future();f 与该 shared state 关联(只能成功调用一次,否则抛 std::future_error)。p 仍然保持与 shared state 的关联(p.valid() 仍然为 true)。
  • 设置值/异常

    • p.set_value(v)p.set_exception(ep):把值或 std::exception_ptr 写入 shared state,并把 ready = true,然后 notify_all() 唤醒等待的 future::wait()/get()。如果已经设置过,通常抛 std::future_errorpromise_already_satisfied)。
  • future.get()

    • 如果 shared state 里是值,get() 会返回该值(或移动出);如果是异常,get()std::rethrow_exception() 抛出原始异常。对于普通的 std::future调用 get() 后该 future 会释放它与 shared state 的关联,随后 f.valid() 变成 false(不能再次 get)。
    • 对于 std::shared_futureget() 可以多次调用且不会使其失效。
  • 移动构造/赋值

    • promisefuturestd::move 会把 shared state 的“所有权/引用”转给目标对象,源对象变为“空”(valid() == false)。
  • 析构/未满足的 promise

    • 如果 promise 在没有调用 set_value/set_exception 的情况下被析构,通常实现会把 shared state 标记为就绪并存入一个 broken_promise 类型的异常(或等价的 std::future_error),这样之后 future.get() 会抛 std::future_error/broken_promise。这避免 future 永远等待。
  • shared_state 的销毁

    • shared state 本身在所有持有它的句柄(promise、future、shared_future)都被销毁后才销毁(通过引用计数或类似机制)。

valid() 的语义

  • promise::valid() / future::valid()返回该对象当前是否与 shared state 关联(即是否有效)。

  • 常见返回情形:

    • 新创建的 promisetrue(通常有 shared state)。
    • promisestd::move 走之后的源对象:false
    • futureget() 之后(非 shared_future):false(因为它释放了关联)。
    • shared_future 的拷贝仍然 true(它是可复制的、多次使用)。
  • 注意并发valid() 不是万能的线程安全检查器。若同时有另一个线程在移动/销毁该对象,则并发调用 valid() 可能出现数据竞争 —— 要避免并发修改与并发读同一对象,除非额外同步。

常见异常 / 错误(与 shared state 相关)

  • std::future_errorpromise_already_satisfied):set_value / set_exception 被重复调用。
  • std::future_errorno_state):在没有 shared state(valid() == false)上调用诸如 get()set_value() 等操作。
  • std::future_errorbroken_promise):promise 析构但未设置值/异常,future.get() 抛出。

并发 / 实现细节要点

  • 实现一般使用 mutex + condition_variable 或原子加自旋/事件机制,保证:

    • set_value/set_exception 原子地写入结果并标记 ready;
    • future.get() 等待直到 ready,再取值或重新抛出 exception;
    • 适当的内存序(release on writer, acquire on reader)以保证写入的值对 reader 可见。
  • 对于大多数用户代码,不需要关心这些低层细节,只需知道:promise 写,future 读;同步由 shared state 内部保证;不要并发在同一个 promise 上做 conflicting 操作(除非你能保证同步)。

示例

#include <iostream>
#include <future>
#include <thread>

template<typename U>
void show_valid(const char* name, U& obj) {
    std::cout << name << ".valid() = "
              << (obj.valid() ? "true" : "false") << '\n';
}

int main() {
    std::promise<int> p;
    auto f = p.get_future();
    show_valid("f (after get_future)", f); // true

    p.set_value(42);
    std::cout << "f.get() = " << f.get() << '\n'; // 42
    show_valid("f (after get)", f); // false,因为 get 后失效

    // move 行为
    std::promise<int> p2;
    auto f2 = p2.get_future();
    auto f3 = std::move(f2); // 转移 shared state
    show_valid("f2 (moved-from)", f2); // false
    show_valid("f3 (moved-to)", f3);   // true
}

上面程序展示了 valid() 的行为,以及 promisefuture 之间的交互。

总结

  • shared state 是值/异常 + 就绪标志 + 同步原语的集合。
  • promise 写,future 读;写操作会 notify,读操作会等待并读取/重新抛异常。
  • valid() 判断的是“是否关联 shared state”,不是“是否已就绪”。
  • future.get()(普通 future)会在取走值后使该 future 失效(valid() -> false)。
  • promise 析构而未设置值会导致 future.get()broken_promise
  • 切记:不要并发地在同一个 promise/future 上做修改(move/销毁/设置等)而不同步——那是数据竞争。

std::packaged_task

概览

std::packaged_task 是 C++11 引入的一个标准库组件,用于将一个可调用对象(如函数、函数指针、lambda 表达式等)包装成一个任务,并与一个 std::future 关联,以便在将来某个时间点获取该任务的结果。它允许我们将任务的执行与结果的获取分离开来,从而实现异步编程。

std::packaged_task 是一个模板类,接受一个函数签名作为模板参数。它提供了一个 operator() 方法,用于执行包装的可调用对象,并将结果存储在与之关联的 std::future 中。我们可以通过调用 get_future 方法来获取这个 std::future 对象,以便在任务完成后获取结果。也正是因为它提供了 operator() 方法,我们可以将 std::packaged_task 对象传递给线程或线程池来执行任务。

下面是一个简单的例子,演示了如何使用 std::packaged_task 来包装一个函数,并在另一个线程中执行这个任务,然后在主线程中获取结果:

void basic_packaged_task() {
    // 创建一个 packaged_task,包装一个普通函数
    auto simple_function = [](int x, int y) {
        std::cout << "Calculating in thread " << std::this_thread::get_id() << std::endl;
        return x * y;
    };

    // 包装函数,模板参数是函数签名
    std::packaged_task<int(int, int)> task(simple_function);

    // 获取与任务关联的 future
    std::future<int> result = task.get_future();

    // 在另一个线程中执行任务
    std::thread worker(std::move(task), 6, 7);  // 注意:task 需要移动

    // 获取结果
    std::cout << "Result: " << result.get() << std::endl;
    worker.join();
}

在这个例子中,我们首先定义了一个简单的 lambda 函数 simple_function,它接受两个整数参数并返回它们的乘积。然后,我们创建了一个 std::packaged_task<int(int, int)> 对象 task,并将 simple_function 传递给它。接下来,我们调用 task.get_future() 获取与任务关联的 std::future<int> 对象 result,以便在任务完成后获取结果。然后,我们启动了一个线程 worker,并将 task 移动到这个线程中执行,同时传递参数 67。最后,我们在主线程中调用 result.get() 来获取任务的结果,并打印出来。

代码输出如下:

Calculating in thread 99048
Result: 42

它同样可以处理异常。如果包装的函数抛出异常,这个异常会被存储在与之关联的 std::future 中,当我们调用 get 方法时,会重新抛出这个异常。下面是一个例子:

void exception_handling() {
    std::packaged_task<double(double)> task([](double x) -> double {
        if (x < 0) {
            throw std::invalid_argument("Negative value not allowed");
        }
        return std::sqrt(x);
    });

    std::future<double> fut = task.get_future();

    std::thread worker(std::move(task), -1.0);

    try {
        double result = fut.get();
        std::cout << "Result: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }

    worker.join();
}

在这个例子中,我们创建了一个 std::packaged_task<double(double)> 对象 task,它包装了一个计算平方根的 lambda 函数。如果传递给这个函数的参数是负数,它会抛出一个 std::invalid_argument 异常。我们启动了一个线程 worker,并将 -1.0 传递给任务。然后,在主线程中调用 fut.get() 来获取结果,由于任务抛出了异常,我们在主线程中捕获并处理了这个异常。代码输出如下:

Caught exception: Negative value not allowed

当我们使用 std::packaged_task 时,是不需要手动创建 std::promise 的,因为 std::packaged_task 内部已经封装了一个 std::promise,并且通过 get_future 方法提供了与之关联的 std::future。因此,使用 std::packaged_task 可以简化代码,使得任务的创建和结果的获取更加方便。

std::async

概览

std::async 是 C++11 引入的一个标准库函数,用于启动一个异步任务,并返回一个 std::future 对象,以便在将来某个时间点获取该任务的结果。它允许我们轻松地将函数调用异步化,从而实现并发编程。

在我们了解了 std::futurestd::promisestd::packaged_task 之后,std::async 可以看作是对这些组件的一个高级封装。它简化了异步任务的创建和管理,使得我们不需要手动创建 std::promisestd::packaged_task,而只需调用 std::async 并传递一个可调用对象(如函数、函数指针、lambda 表达式等)以及其参数。

std::async 的基本用法如下:

void basic_async_example() {
    // 简单的异步函数
    auto multiply = [](int a, int b) {
        std::cout << "Async task running in thread: "
            << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return a * b;
        };

    // 启动异步任务 - 最简单的形式
    std::future<int> result = std::async(multiply, 6, 7);

    // 主线程可以继续做其他工作
    std::cout << "Main thread working...\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    // 获取结果(如果需要)
    std::cout << "Result: " << result.get() << std::endl;
}

在这个例子中,我们定义了一个简单的 lambda 函数 multiply,它接受两个整数参数并返回它们的乘积。然后,我们调用 std::async 并传递 multiply 以及参数 67,这会启动一个异步任务,并返回一个与之关联的 std::future<int> 对象 result。主线程可以继续执行其他工作,而不需要等待异步任务完成。最后,我们调用 result.get() 来获取任务的结果,这会阻塞直到任务完成并返回结果。

代码输出如下:

Main thread working...
Async task running in thread: 70252
Result: 42

std::async 还有一个重要的参数 std::launch,它用于指定任务的启动策略。std::launch 有两个选项:

  • std::launch::async:强制任务在一个新的线程中异步执行。
  • std::launch::deferred:任务的执行被延迟,直到调用 get()wait() 方法时才执行,此时任务会在调用线程中执行。
  • std::launch::async | std::launch::deferred(默认值):允许实现选择最合适的策略,可能是异步执行,也可能是延迟执行。

下面我们举一个例子,来演示两种策略的区别:

void launch_policy_compare() {
    auto task = []() {
        std::this_thread::sleep_for(std::chrono::seconds(2));
    };
  // 使用 std::launch::deferred
    {
        auto fut1 = std::async(std::launch::deferred, task);
        std::cout << "Deferred task created.\n";
        std::cout << "Main thread doing other work...\n";
        std::this_thread::sleep_for(std::chrono::seconds(1));
        // 任务不会立即执行,直到调用 get() 或 wait()
        {
            Timer<std::chrono::seconds> timer;
            fut1.wait(); // 现在任务才会执行
        }
        std::cout << "Deferred task completed.\n";
    }
    std::cout << std::endl;
    // 使用 std::launch::async
    {
        auto fut2 = std::async(std::launch::async, task);
        std::cout << "Async task created.\n";
    std::cout << "Main thread doing other work...\n";
        std::this_thread::sleep_for(std::chrono::seconds(1));
        // 任务会立即在新线程中执行
        {
      Timer<std::chrono::seconds> timer;
            fut2.wait(); // 等待任务完成
        }
        std::cout << "Async task completed.\n";
    }
}

在这个例子中,我们定义了一个简单的任务 task,它会睡眠两秒钟。然后,我们分别使用 std::launch::deferredstd::launch::async 启动了两个异步任务:

  • 对于使用 std::launch::deferred 启动的任务,任务不会立即执行,而是等到我们调用 fut1.wait() 时才会执行,这时任务会在调用线程中执行,主线程在做其他任务的时候,任务并没有开始执行。因此,主线程等待任务完成的时间需要两秒钟。
  • 对于使用 std::launch::async 启动的任务,任务会立即在一个新的线程中执行,主线程在做其他任务的时候,任务已经开始执行了。因此,主线程等待任务完成的时间只需要一秒钟。

代码输出如下:

Deferred task created.
Main thread doing other work...
Elapsed time: 2 s
Deferred task completed.

Async task created.
Main thread doing other work...
Elapsed time: 1 s
Async task completed.

使用 std::async 同样可以处理异常。如果异步任务抛出异常,这个异常会被存储在与之关联的 std::future 中,当我们调用 get 方法时,会重新抛出这个异常,使用方法和上面讲解的 std::packaged_task 类似,这里就不再赘述。

实际应用

使用 std::async,我们可以实现一些有趣的程序,我来列举一个生产中常见的应用:异步任务链。

假设我们有一系列需要依次执行的任务,每个任务的输出是下一个任务的输入。我们可以使用 std::async 来实现这个任务链,每个任务都在一个新的线程中异步执行,并且我们可以通过 std::future 来获取每个任务的结果。使用 std::async 实现的异步任务链天然支持异常传播,如果某个任务抛出异常,后续的任务将不会执行,并且异常会被传递到最终的 get 调用处。

void async_chain_example() {
    // 创建数据
    auto create_data = []() -> std::vector<int>{
    std::vector<int> data(100);
    std::iota(data.begin(), data.end(), 1); // 填充数据 1 到 100
        return data;
    };
  // 筛选数据
    auto process_data = [](std::vector<int> data) -> std::vector<int> {
    std::vector<int> filtered;
        std::copy_if(data.begin(), data.end(), std::back_inserter(filtered),
      [](int x) { return x % 2 == 0; }); // 只保留偶数
        return filtered;
  };
  // 汇总数据
    auto aggregate_data = [](std::vector<int> data) -> int {
        return std::accumulate(data.begin(), data.end(), 0); // 求和
  };

    // 链式调用
  auto data_future = std::async(std::launch::async, create_data);
    auto processed_future = std::async(std::launch::async, [data_future = std::move(data_future), process_data]() mutable {
        return process_data(data_future.get());
  });
    auto aggregated_future = std::async(std::launch::async, [processed_future = std::move(processed_future), aggregate_data]() mutable {
        return aggregate_data(processed_future.get());
  });
    // 获取最终结果
  int result = aggregated_future.get();
  std::cout << "Final aggregated result: " << result << std::endl;
}

在这个例子中,我们定义了三个任务:create_data 用于创建一个包含 1 到 100 的整数的向量,process_data 用于筛选出其中的偶数,aggregate_data 用于计算筛选后的偶数的和。然后,我们使用 std::async 来异步执行这些任务,并通过捕获前一个任务的 std::future 来实现任务链。最后,我们调用 aggregated_future.get() 来获取最终的结果。代码输出如下:

Final aggregated result: 2550

std::shared_future

概览

std::shared_future 是 C++11 引入的一个标准库组件,用于实现多个线程共享同一个异步任务的结果。与 std::future 不同,std::shared_future 允许多个 std::shared_future 实例共享同一个结果,这使得我们可以在多个线程中同时获取同一个异步任务的结果,而不需要担心结果被多次获取后失效。与 std::future 一样,std::shared_future 也可以与 std::promisestd::packaged_task 结合使用,以实现异步编程,但 std::shared_future 是可以被复制的,而 std::future 只能被移动。

下面是一个简单的例子,演示了如何使用 std::shared_future 来共享一个异步任务的结果:

void basic_shared_future() {
    // 创建 promise 和 future
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    // 将 future 转换为 shared_future(可以复制)
    std::shared_future<int> shared_fut = fut.share();

    // 创建多个消费者线程
    std::vector<std::thread> consumers;

    for (int i = 0; i < 3; ++i) {
        consumers.emplace_back([i, shared_fut]() {  // 复制 shared_future
            // 每个消费者都可以调用 get()
            int result = shared_fut.get();
            std::cout << "Consumer " << i << " got result: " << result
                      << " in thread " << std::this_thread::get_id() << std::endl;
        });
    }

    // 生产者设置值
    std::thread producer([&prom]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Producer setting value to 100\n";
        prom.set_value(100);
    });

    producer.join();
    for (auto& consumer : consumers) {
        consumer.join();
    }
}

在这个例子中,我们首先创建了一个 std::promise<int> 对象 prom,并通过 prom.get_future() 获取了一个与之关联的 std::future<int> 对象 fut。然后,我们调用 fut.share() 将其转换为一个 std::shared_future<int> 对象 shared_fut,这样我们就可以在多个线程中共享这个结果。接下来,我们创建了三个消费者线程,每个线程都复制了 shared_fut 并调用 get() 方法来获取结果。最后,我们启动了一个生产者线程,在这个线程中模拟了一些工作(通过 sleep_for),然后调用 prom.set_value(100) 来设置值。打印输出如下:

Producer setting value to 100
Consumer 0 got result: 100 in thread 101436
Consumer 2 got result: 100 in thread 101444
Consumer 1 got result: 100 in thread 101440

在这个输出中,我们可以看到生产者线程设置了值 100,然后三个消费者线程都成功地获取到了这个值,并且它们的线程 ID 也不同,说明它们是在不同的线程中执行的。

其实,下面这段代码可以合并成一行代码:

std::future<int> fut = prom.get_future();
// 将 future 转换为 shared_future(可以复制)
std::shared_future<int> shared_fut = fut.share();

我们可以直接通过 std::shared_future 去接收 fut.share() 的返回值:

std::shared_future<int> shared_fut = prom.get_future();

这是因为 std::shared_future 提供了通过 std::future 移动构造函数来创建 std::shared_future 的能力。

我们查询官网,可以看到:

shared_future_constructor

这说明 std::shared_future 可以通过一个 std::future 对象来构造,并且这个构造函数是一个移动构造函数,这意味着我们可以直接将 prom.get_future() 的结果传递给 std::shared_future 的构造函数,而不需要先将其存储在一个临时的 std::future 变量中。需要注意的是,通过 shared() 方法获得的 std::shared_future 对象是通过移动构造函数创建的,因此原始的 std::future 对象在调用 share() 后将变为无效(valid() 返回 false),而 std::shared_future 对象则可以被复制和共享,而该 std::future 对象不再可用。

C++11 异步编程的超时和等待策略

在 C++11 中,异步编程主要通过 std::asyncstd::futurestd::promise 等组件实现。为了更好地控制异步任务的执行和结果获取,C++11 提供了一些超时和等待策略,主要体现在 std::future 类中。以下是一些常见的超时和等待策略:

1. wait() 方法

wait() 方法用于阻塞当前线程,直到与 std::future 关联的异步任务完成。这个方法没有超时机制,它会一直等待,直到任务完成或异常被抛出。

void basic_wait_example() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread worker([&prom]() {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "Worker: Setting value to 100\n";
        prom.set_value(100);
    });

    std::cout << "Main: Waiting for result...\n";
    fut.wait();  // 阻塞直到结果就绪
    std::cout << "Main: Got result: " << fut.get() << std::endl;

    worker.join();
}

在这个例子中,主线程调用 fut.wait(),它会阻塞直到工作线程设置了值。同样,如果我们直接调用 fut.get(),它也会阻塞直到结果就绪。

2. wait_for() 方法

wait_for() 方法允许我们指定一个超时时间,等待与 std::future 关联的异步任务完成。如果在指定的时间内任务没有完成,wait_for() 会返回一个状态,表示等待超时。

wait_for() 的返回值是一个 std::future_status 枚举,可能的值有:

  • std::future_status::ready:任务已经完成,可以调用 get() 获取结果。
  • std::future_status::timeout:等待超时,任务还没有完成。
  • std::future_status::deferred:任务被延迟执行(如果使用了 std::launch::deferred 启动策略)。

所以说,我们在批量化管理异步任务超时处理的时候,需要同时管理三种状态,是可以通过 switch 语句来处理的。

void wait_for_example() {
    Timer<std::chrono::seconds> timer;
    auto slow_task = [](int seconds) -> std::string {
        std::this_thread::sleep_for(std::chrono::seconds(seconds));
        return "Task completed after " + std::to_string(seconds) + " seconds";
    };

    // 启动一个需要3秒的任务
    std::future<std::string> fut = std::async(std::launch::async, slow_task, 3);

    std::cout << "Waiting with 2 second timeout...\n";

    // 等待最多2秒
    auto status = fut.wait_for(std::chrono::seconds(2));

    if (status == std::future_status::ready) {
        std::cout << "Success: " << fut.get() << std::endl;
    } else {
        std::cout << "Timeout after 2 seconds. Task still running.\n";
        // 在实际应用中,这里可能需要取消任务或记录日志
        std::cout << "Now waiting indefinitely: " << fut.get() << std::endl;
    }
}

在这个示例中,我们启动了一个需要 3 秒钟完成的异步任务,并使用 wait_for 方法等待最多 2 秒钟。如果任务在 2 秒内没有完成,wait_for 会返回 std::future_status::timeout,我们可以选择继续等待或者采取其他措施。

执行结果如下:

Waiting with 2 second timeout...
Timeout after 2 seconds. Task still running.
Now waiting indefinitely: Task completed after 3 seconds
Elapsed time: 3 s

若我们将任务执行时间改为 1 秒钟,结果会有所不同:

Waiting with 2 second timeout...
Success: Task completed after 1 seconds
Elapsed time: 1 s

可以看到,任务在 2 秒内完成,wait_for 返回 std::future_status::ready,我们成功获取了结果。整个过程持续了 1 秒钟,这说明 wait_for 在任务完成后会立即返回,而不是等到超时。

3. wait_until() 方法

wait_until() 方法允许我们指定一个绝对时间点,等待与 std::future 关联的异步任务完成。如果在指定的时间点之前任务没有完成,wait_until() 会返回一个状态,表示等待超时。

wait_until() 的返回值与 wait_for() 相同,也是一个 std::future_status 枚举。

void wait_until_example() {
    std::promise<void> data_ready;
    std::future<void> ready_future = data_ready.get_future();

    std::thread data_producer([&data_ready]() {
        // 模拟数据处理时间
        std::this_thread::sleep_for(std::chrono::seconds(4));
        std::cout << "Data producer: Data is ready!\n";
        data_ready.set_value();
        });

    // 设置截止时间:当前时间 + 3秒
    auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(3);

    std::cout << "Waiting for data until deadline...\n";
    auto status = ready_future.wait_until(deadline);

    if (status == std::future_status::ready) {
        std::cout << "Data received before deadline!\n";
    }
    else {
        std::cout << "Deadline exceeded! Data not ready in time.\n";
        // 可以触发超时处理逻辑
    }

    data_producer.join();
    if (ready_future.valid()) {
        ready_future.get();  // 清理状态
    }
}

该示例当中,我们创建了一个 std::promise<void> 对象 data_ready,并通过 data_ready.get_future() 获取了一个与之关联的 std::future<void> 对象 ready_future。然后,我们启动了一个数据生产者线程 data_producer,它模拟了一些数据处理时间(4 秒钟),然后调用 data_ready.set_value() 来表示数据已经准备好。

执行结果如下:

Waiting for data until deadline...
Deadline exceeded! Data not ready in time.
Data producer: Data is ready!

综合示例–线程池

下面是一个基于 C++11 标准库实现的简单线程池示例,展示了如何使用 std::threadstd::mutexstd::condition_variablestd::future 来管理和执行异步任务。这个线程池允许我们提交任意返回值和任意参数的任务,并在多个工作线程中并发执行这些任务,同时支持任务结果的异步获取,利用 std::packaged_task 实现任务的封装,从而方便统一管理。

class ThreadPool {
public:
    // 构造函数:创建指定数量的工作线程
    ThreadPool(size_t threads) {
        for(size_t i = 0; i < threads; ++i) {
            // 每个线程执行一个无限循环,从任务队列里取任务
            workers.emplace_back([this] {
                for(;;) {
                    std::function<void()> task;
                    {
                        // 加锁保护任务队列
                        std::unique_lock<std::mutex> lock(this->queue_mutex);

                        // 条件变量等待,直到:
                        // 1) 线程池被停止 (stop == true),或者
                        // 2) 任务队列不为空
                        this->condition.wait(lock, [this] {
                            return this->stop || !this->tasks.empty();
                        });

                        // 如果线程池停止并且没有任务了,就退出线程
                        if(this->stop && this->tasks.empty())
                            return;

                        // 从任务队列取出一个任务(用 move 避免拷贝)
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    // 解锁后执行任务
                    task();
                }
            });
        }
    }

    // 析构函数:等待所有线程完成,安全关闭线程池
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;  // 标记线程池停止
        }
        condition.notify_all(); // 唤醒所有等待的线程,让它们退出
        for(std::thread &worker: workers)
            worker.join(); // 等待所有线程结束
    }

    // 提交任务到线程池,返回一个 future 获取结果
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<std::invoke_result_t<F, Args...>>
    {
        using return_type = std::invoke_result_t<F, Args...>;

        // 将函数和参数绑定到一个 packaged_task 中
        // packaged_task 能把函数执行的结果放入 future
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        // 获取 future,用于异步获取返回值
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            // 如果线程池已经停止,不能再提交任务
            if(stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");

            // 将任务包装成无参函数,放入任务队列
            tasks.emplace([task]() { (*task)(); });
        }
        condition.notify_one(); // 唤醒一个等待的工作线程
        return res; // 返回 future 给调用者
    }

private:
    // 线程池成员变量
    std::vector<std::thread> workers;               // 工作线程集合
    std::queue<std::function<void()>> tasks;        // 任务队列

    std::mutex queue_mutex;                         // 队列互斥锁
    std::condition_variable condition;              // 条件变量,用来唤醒工作线程
    bool stop = false;                              // 标记线程池是否关闭
};

线程池的工作流程

1. 初始化(构造函数)

ThreadPool pool(4); // 创建一个有4个工作线程的线程池
  • 构造函数里启动 threads 个工作线程,每个线程都在循环执行:

    1. 等待任务队列里有任务(condition.wait)。
    2. 取出一个任务。
    3. 执行任务。
    4. 再回去等待下一个任务。
  • 如果线程池被关闭 (stop = true) 并且任务队列为空,线程就会退出。

  • 线程池一旦构造好,就有若干个空闲的后台线程在等待任务。


2. 提交任务(enqueue)

auto fut = pool.enqueue([](int a, int b) { return a + b; }, 2, 3);
  • enqueue 接收任意可调用对象(函数、lambda、函数对象等)和参数。

  • 内部用 std::packaged_task 包装任务:

    • packaged_task 能在任务执行后,把结果放到一个 共享状态,供 std::future 获取。
  • 返回一个 std::future,调用者可以用它获取结果:

    int result = fut.get(); // 等待并获取 2 + 3 的结果
  • 任务被加入队列 (tasks.emplace(...))。

  • condition.notify_one() 唤醒一个正在等待的工作线程去执行这个任务。

  • 调用者只需提交任务,线程池会负责挑选一个空闲线程去执行。


3. 工作线程的循环

每个工作线程在后台执行类似这样的逻辑:

for (;;) {
    // 1. 等待任务(条件变量阻塞,直到有任务或线程池关闭)
    wait_for_task();

    // 2. 如果线程池关闭并且没有任务,退出循环
    if (stop && tasks.empty())
        return;

    // 3. 取出队列中的一个任务
    task = tasks.front();
    tasks.pop();

    // 4. 执行任务
    task();
}
  • 线程一直在“取任务 → 执行 → 再取任务”的循环里,直到线程池销毁。

4. 销毁(析构函数)

// ThreadPool 对象生命周期结束时,自动调用析构函数
  • 设置 stop = true,通知所有线程池里的线程该退出了。

  • condition.notify_all():唤醒所有还在等待的线程。

  • 每个工作线程发现 stop == true 且任务队列为空时,就安全退出。

  • 最后,主线程调用 worker.join() 等待所有线程结束,保证线程池资源完全释放。

  • 确保线程池在销毁时不会有线程“悬空”运行。


下面我们通过计算某个数据之间的素数数量来测试一下该线程池的性能:

void thread_pool_example() {
    const size_t thread_num = 4;
  const size_t max_num = 1000000;
  std::cout << "Calculating prime numbers up to " << max_num << " using " << thread_num << " threads.\n";
    ThreadPool pool(thread_num); // 创建一个包含4个线程的线程池
    // 计算 1 到 1000000 之间的素数数量
  size_t range_per_thread = max_num / thread_num;
  std::vector<std::future<size_t>> results;
    auto is_prime = [](size_t n)-> bool {
        if (n <= 1) return false;
        for (size_t i = 2; i * i <= n; ++i) {
            if (n % i == 0) return false;
        }
        return true;
  };
    auto count_primes_in_range = [is_prime](size_t begin, size_t end)-> size_t {
        size_t count = 0;
        for (size_t i = begin; i <= end; ++i) {
            if (is_prime(i)) {
                ++count;
            }
        }
        return count;
  };
    {
    std::cout << "Multi-threaded prime number calculation...\n";
        Timer<> timer;
        for (size_t i = 0; i < thread_num; ++i) {
            size_t begin = i * range_per_thread + 1;
            size_t end = (i == thread_num - 1) ? max_num : (i + 1) * range_per_thread;
            results.emplace_back(pool.enqueue(count_primes_in_range, begin, end));
        }
        size_t total_primes = 0;
        for (auto& res : results) {
            total_primes += res.get();
        }
        std::cout << "Total prime numbers between 1 and " << max_num << ": " << total_primes << std::endl;
    }

    {
    std::cout << "Single-threaded prime number calculation for comparison...\n";
    Timer<> timer;
    size_t total_primes = count_primes_in_range(1, max_num);
    std::cout << "Total prime numbers between 1 and " << max_num << ": " << total_primes << std::endl;
    }
    
}

在这个例子中,我们创建了一个包含 4 个工作线程的线程池,并将计算 1 到 1,000,000 之间素数数量的任务分配给这些线程。每个线程负责计算一个范围内的素数数量,最后我们汇总所有线程的结果。为了对比,我们还实现了一个单线程版本的素数计算。

运行结果如下:

Calculating prime numbers up to 1000000 using 4 threads.
Multi-threaded prime number calculation...
Total prime numbers between 1 and 1000000: 78498
Elapsed time: 53 ms
Single-threaded prime number calculation for comparison...
Total prime numbers between 1 and 1000000: 78498
Elapsed time: 154 ms

可以看到,多线程版本的计算时间明显少于单线程版本,展示了线程池在处理 CPU 密集型任务时的优势。