C++并发编程系列总结-并发同步

C++并发编程系列:关于并发同步的总结

Posted by YuCong on March 9, 2021

C++ 并发编程系列,关于并发同步的总结。


Created 2021.02.15 by William Yu; Last modified: 2022.09.16-v1.1.0

Contact: windmillyucong@163.com

Copyleft! 2022 William Yu. Some rights reserved.


并发同步总结

API 操作 说明
condition_variable
condition_variable_any
- wait() - 等待条件满足
- wait_for() - 等待条件,带超时
- wait_until() - 等待条件到指定时间点
- notify_one() - 通知一个等待线程
- notify_all() - 通知所有等待线程
用于线程间的条件等待和通知机制,需要配合mutex使用
async - async(launch::async, func) - 强制异步执行
- async(launch::deferred, func) - 延迟执行
- async(launch::async|deferred, func) - 自动选择
异步运行一个函数,并返回保有其结果的std::future
可指定不同的启动策略
future - get() - 获取结果(仅能调用一次)
- wait() - 等待结果准备就绪
- wait_for() - 限时等待
- wait_until() - 等待至指定时间点
- valid() - 检查是否有效
等待被异步设置的值
只能移动,不能复制
通常作为异步操作的结果获取接口
packaged_task - get_future() - 获取关联的future
- operator() - 执行任务
- valid() - 检查是否有效
- reset() - 重置任务状态
打包一个函数,存储其返回值以进行异步获取
可重复使用的任务包装器
promise - get_future() - 获取关联的future
- set_value() - 设置值
- set_exception() - 设置异常
- set_value_at_thread_exit() - 线程退出时设置值
存储一个值以进行异步获取
通常用于线程间的单次数据传递
shared_future - get() - 获取结果(可多次调用)
- wait() - 等待结果准备就绪
- wait_for() - 限时等待
- wait_until() - 等待至指定时间点
- valid() - 检查是否有效
等待被异步设置的值
可以复制和共享
多个线程可以同时访问结果

1. 条件变量 condition_variable

  • 当某个条件满足时,以”信号量”的方式唤醒因为该条件而被阻塞的线程
  • 最为常见的使用场景就是: 在线程池中,起初没有任务时任务队列为空,此时线程池中的线程因为”任务队列为空”这个条件处于阻塞状态。一旦有任务进来,就会以信号量的方式唤醒一个线程来处理这个任务。

头文件:

1
#include <condition_variable>

类型:

  • std::condition_variable(只和std::mutex一起工作)
  • std::condition_variable_any(符合类似互斥元的最低标准的任何东西一起工作)

主要成员函数:

  • wait(): 阻塞当前线程,直到条件变量被唤醒
  • wait_for(): 阻塞当前线程,直到条件变量被唤醒或超时
  • wait_until(): 阻塞当前线程,直到条件变量被唤醒或到达指定时间点
  • notify_one(): 唤醒一个等待的线程
  • notify_all(): 唤醒所有等待的线程

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex Mtx;
std::condition_variable Cv;
std::queue<int> DataQueue;
bool Done = false;

// 生产者线程
void Producer() {
    for (int i = 0; i < 5; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::lock_guard<std::mutex> Lock(Mtx);
        DataQueue.push(i);
        std::cout << "Produced: " << i << std::endl;
        Cv.notify_one();  // 通知消费者
    }
    
    std::lock_guard<std::mutex> Lock(Mtx);
    Done = true;
    Cv.notify_all();  // 通知所有消费者结束
}

// 消费者线程
void Consumer() {
    while (true) {
        std::unique_lock<std::mutex> Lock(Mtx);
        // 等待直到队列非空或Done为true
        Cv.wait(Lock, []{ return !DataQueue.empty() || Done; });
        
        if (Done && DataQueue.empty()) {
            break;
        }
        
        int Value = DataQueue.front();
        DataQueue.pop();
        std::cout << "Consumed: " << Value << std::endl;
    }
}

int main() {
    std::thread ProducerThread(Producer);
    std::thread ConsumerThread(Consumer);
    
    ProducerThread.join();
    ConsumerThread.join();
    
    return 0;
}

这个示例展示了条件变量的典型用法:

  1. 生产者线程向队列中添加数据,并通过notify_one()通知消费者
  2. 消费者线程使用wait()等待数据,直到队列非空或收到结束信号
  3. 使用unique_lock而不是lock_guard,因为wait()需要能够解锁和重新锁定互斥量

注意事项:

  1. 条件变量通常与互斥锁配合使用
  2. 使用wait()时应该总是检查条件,避免虚假唤醒
  3. 在修改共享数据时应该持有锁
  4. 在调用notify_one()notify_all()之前应该确保数据已经准备好

2. future

std::future 是一个模板类,用于访问异步操作的结果。它提供了一种机制来获取异步任务的返回值。

主要特点:

  • 只能移动,不能复制
  • 通过 get() 获取结果,如果结果还没准备好会阻塞
  • 通常与 asyncpromisepackaged_task 配合使用

主要成员函数:

  • get(): 获取存储的值或抛出存储的异常
  • wait(): 阻塞直到结果可用
  • wait_for(): 等待指定的时间段
  • wait_until(): 等待直到指定的时间点
  • valid(): 检查future是否有效

3. async

std::async 是一个函数模板,用于异步执行任务。它提供了一种简单的方式来启动异步任务并获取其结果。

启动策略:

  • std::launch::async: 保证异步执行
  • std::launch::deferred: 延迟执行直到调用get()或wait()
  • std::launch::async | std::launch::deferred: 由系统决定(默认)

使用示例:

1
2
3
4
5
auto future = std::async(std::launch::async, []() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
});
int result = future.get();  // 等待任务完成并获取结果

4. promise

std::promise 提供了一种方式来存储一个值或异常,供将来异步获取。它通常用于在一个线程中设置值,在另一个线程中通过关联的future获取该值。

主要特点:

  • 可以存储值或异常
  • 与future配对使用
  • 只能设置一次值

使用示例:

1
2
3
4
5
6
7
8
9
std::promise<int> promise;
std::future<int> future = promise.get_future();

std::thread producer([&promise]() {
    promise.set_value(42);  // 设置值
});

int value = future.get();  // 获取值
producer.join();

5. shared_future

std::shared_future 类似于 std::future,但可以被多个线程同时访问。它支持复制,允许多个线程等待同一个结果。

主要特点:

  • 可以复制,支持多个观察者
  • 所有副本都会收到相同的结果
  • 可以从 std::future 移动构造

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::promise<int> promise;
std::shared_future<int> shared_future = promise.get_future().share();

auto reader = [](std::shared_future<int> sf) {
    int value = sf.get();  // 多个线程可以同时调用get()
};

std::thread t1(reader, shared_future);
std::thread t2(reader, shared_future);

promise.set_value(42);
t1.join();
t2.join();

Contact

Feel free to contact me windmillyucong@163.com anytime for anything.


License

CC0