C++并发编程系列总结-锁

C++并发编程系列:关于锁的使用总结

Posted by YuCong on February 6, 2021

C++ 并发编程系列,关于锁的使用总结。


Created 2021.02.15 by William Yu; Last modified: 2022.09.16-v1.1.0

Contact: windmillyucong@163.com

Copyleft! 2022 William Yu. Some rights reserved.


lock

References

0. Concepts

1. 锁的类型

1.1 互斥锁 mutex

互斥锁

  • 在某一时刻,只有一个线程可以获取互斥锁
  • 在释放互斥锁之前其他线程都不能获取该互斥锁
  • 如果其他线程想要获取这个互斥锁,那么这个线程只能以阻塞方式进行等待
1
2
3
#include <mutex>

std::mutex some_mutex;

1.2 条件锁 condition_variable

  • 条件锁 即 条件变量
  • 某一个线程因为某个条件为满足时可以使用条件变量使改程序处于阻塞状态
  • 一旦条件满足以“信号量”的方式唤醒一个因为该条件而被阻塞的线程
  • 最为常见就是在线程池中,起初没有任务时任务队列为空,此时线程池中的线程因为“任务队列为空”这个条件处于阻塞状态。一旦有任务进来,就会以信号量的方式唤醒一个线程来处理这个任务。

头文件:

1
#include <condition_variable>

类型:

  • std::condition_variable(只和std::mutex一起工作)
  • std::condition_variable_any(符合类似互斥元的最低标准的任何东西一起工作)

// todo(congyu)

  • wait()的实现接下来检查条件,并在满足时返回。如果条件不满足,wait()解锁互斥元,并将该线程置于阻塞或等待状态。
  • 当来自数据准备线程中对notify_one()的调用通知条件变量时,线程从睡眠状态中苏醒(解除其阻塞),重新获得互斥元上的锁,并再次检查条件,如果条件已经满足,就从wait()返回值,互斥元仍被锁定。如果条件不满足,该线程解锁互斥元,并恢复等待。
  • 如果等待线程只打算等待一次,那么当条件为true时它就不会再等待这个条件变量了,条件变量未必是同步机制的最佳选择。
  • 如果等待的条件是一个特定数据块的可用性时,这尤其正确。在这个场景中,使用期值(future)更合适。使用future等待一次性事件。

1.3 自旋锁 Spinlock

原理

  • 互斥锁是一种sleep-waiting 的锁
    • 流程为:假设线程T1获取互斥锁并且正在处理器core1上运行时,此时线程T2也想要获取互斥锁,但是由于T1正在使用互斥锁使得T2被阻塞。当T2处于阻塞状态时,T2被放入到等待队列中去,处理器core2会去处理其他任务而不必一直等待(忙等)。也就是说处理器不会因为线程阻塞而空闲着,它会去处理其他事务去。
  • 自旋锁是一种busy-waiting 的锁
    • 流程为:假设线程T1获取互斥锁并且正在处理器core1上运行时,此时线程T2也想要获取互斥锁,但是由于T1正在使用互斥锁使得T2被阻塞。此时运行T2的处理器core2会一直不断地循环检查锁是否可用,直到获取到锁为止。
    • 当发生阻塞时,互斥锁可以让CPU去处理其他的任务;而自旋锁让CPU一直不断循环请求获取这个锁。通过两个含义的对比可以我们知道“自旋锁”是比较耗费CPU的。

如果你能确定被锁住的代码执行时间很短,就不应该用互斥锁,而应该使用自旋锁,否则使用互斥锁。 Q: 实际项目使用中,差别大吗?

实现一个自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * @brief A spinlock implemented using an atomic_flag.
 * @remark Reference: https://en.cppreference.com/w/cpp/atomic/atomic_flag
 */
class Spinlock {
 public:
  Spinlock() = default;
  ~Spinlock() = default;

  void lock() {
    while (lock_.test_and_set(std::memory_order_acquire));
  }

  bool try_lock() {
    return !lock_.test_and_set(std::memory_order_acquire);
  }

  void unlock() {
    lock_.clear(std::memory_order_release);
  }

 private:
  std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
};

使用方法

1
2
3
4
5
6
7
Spinlock user_data_lock_;
UserData user_data_;
  
int main() {
    std::lock_guard<Spinlock> lock(user_data_lock_);
    user_data_ //balabala
}

1.4 读写锁 shared_mutex

  • “读者-写者”问题。

    • 计算机中某些数据被多个进程共享,对数据的操作有两种:

      • 一种是读操作,就是从数据库中读取数据不会修改数据库中内容;

      • 另一种就是写操作,写操作会修改数据库中存放的数据。

    • 我们允许对数据同时执行多个“读”操作

    • 但是某一时刻只能在数据库上有一个“写”操作来更新数据。

    • 这就是一个简单的读者-写者模型。

头文件

1
boost/thread/shared_mutex.cpp

类型

boost::shared_lock

// todo(congyu)

你可以使用boost::shared_mutex的实例来实现同步,而不是使用std::mutex的实例。对于更新操作,std::lock_guard<boost::shared_mutex>和 std::unique_lock<boost::shared_mutex>可用于锁定,以取代相应的std::mutex特化。这确保了独占访问,就像std::mutex那样。那些不需要更新数据结构的线程能够转而使用 boost::shared_lock<boost::shared_mutex>来获得共享访问。这与std::unique_lock用起来正是相同的,除了多个线程在同一时间,同一boost::shared_mutex上可能会具有共享锁。唯一的限制是,如果任意一个线程拥有一个共享锁,试图获取独占锁的线程会被阻塞,知道其他线程全都撤回它们的锁。同样的,如果一个线程具有独占锁,其他线程都不能获取共享锁或独占锁,直到第一个线程撤回它的锁。

1.5 递归锁

// todo(congyu)


2. 上锁操作

2.0 mutex.lock()

对于互斥量,可以使用互斥量的 lock() 和 unlock() 方法上锁和解锁。

  • 需要手动调用
1
2
3
4
5
6
7
std::vector<long> some_list;  // 共享的数据
std::mutex some_mutex;  // 互斥量
void some_thread_function(){
	some_mutex.lock();
    // 一些操作
    some_mutex.unlock();
}

2.1 std::lock_guard<>

std::lock_guard

自动

  • 自动上锁,自动解锁
  • 而对互斥锁的lock()和unlock()需要手动调用
  • 构造时自动上锁
  • 离开局部作用域,析构函数自动解锁
1
2
3
4
5
6
std::vector<long> some_list;  // 共享的数据
std::mutex some_mutex;  // 互斥量
void some_thread_function(){
	std::lock_guard<std::mutex> lock(some_mutex);    // 修改数据之前上锁
    // 一些操作
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <mutex>   // 头文件

std::vector<long> some_list;  // 共享的数据
std::mutex some_mutex;  // 互斥量

void add_to_list(int new_value) {
  std::lock_guard<std::mutex> lock(some_mutex);    // 修改数据之前上锁
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find) {
  std::lock_guard<std::mutex> lock(some_mutex);    // 访问数据之前上锁
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

2.2 std::unique_lock<>

std::unique_lock

轻度灵活锁

  • 可以提供第二参数:
    • std::adopt_lock 假定当前线程已经获得锁,不再请求
    • std::defer_lock 表示在构造锁的时候并不上锁,使互斥量保持在解锁状态
    • std::try_to_lock 尝试请求锁,但不阻塞线程,锁不可用时也会立即返回
  • 如果不提供第二参数,表示构造时同时也上锁需要手动解锁

    1
    2
    3
    
    std::unique_lock<std::mutex> some_lock(some_mutex);
    // do something
    some_lock.unlock();
    

###### std::adopt_lock

  • 假定当前线程已经获得锁,不再请求

    1
    
       std::unique_lock<std::mutex> some_lock(some_mutex, std::adopt_lock);
    
std::defer_lock
  • 表示在构造锁的时候并不上锁,使互斥量保持在解锁状态
  • std::unique_lock锁对象可以传给lock()对象作为参数

    1
    2
    3
    4
    5
    
       std::unique_lock<std::mutex> some_lock(some_mutex, std::defer_lock);
       std::lock(some_lock);
       some_lock.lock();
       some_lock.unlock();
       some_lock.try_lock();
    
std::try_to_lock
  • 尝试请求锁,但不阻塞线程,锁不可用时也会立即返回。

特点

  • std::unique_lockstd::lock_guard体积大,所以后者如果够用,建议优先使用后者

3. 死锁 deadlock

产生原因
  • 一个给定操作需要两个或两个以上的互斥量时,可能出现死锁
  • 与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做

举例:

两个线程,两个互斥量。线程1访问数据A时,将互斥量A上锁,然后开始操作,后面的某些操作中需要用到数据B,会尝试获取B的锁;与此同时,线程2在访问数据B时,将互斥量B上锁,然后开始操作,某些操作中需要用到数据A,会尝试获取A的锁。这种情况下,线程1在等待互斥量B的锁被释放,而线程2在等待互斥量A的锁被释放。产生死锁。

避免的措施
1. 总是按顺序上锁
  • 无死锁的代码风格 deadlock-free:
    • 建议互斥量总是以相同的顺序上锁。(上面的例子中,线程1和2都先锁互斥量A,再锁互斥量B) – 但这种方法并不总是奏效:slightly_smiling_face:
    • 一个线程只持有一个锁,当已经持有一个锁时,不要再去获取第二把锁
    • 使用分层互斥锁
2. 同时上两个锁

同时上两个锁:

1
2
3
    std::lock(user_data_mutex_, user_data2_mutex_);
    std::lock_guard<std::mutex> lock1(user_data_mutex_, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(user_data2_mutex_, std::adopt_lock);
std::lock()

两个步骤:

  1. C++ 标准库中提供了std::lock()函数,能够保证将多个互斥锁同时上锁。 备注: std::lock(): c++标准提供的解决方案。可以一次锁住多个互斥量,没有死锁风险
  2. 然后,使用lock_guard,加参数std::adopt_lock表示无需再次上锁。 备注:因为互斥锁已经被上锁了,那么lock_guard构造的时候不应该上锁,只是需要在析构的时候释放锁就行了,使用参数std::adopt_lock表示无需上锁:
1
2
3
std::lock(_mu, _mu2);
std::lock_guard<std::mutex> lock1(_mu, std::adopt_lock);
std::lock_guard<std::mutex> lock2(_mu2, std::adopt_lock);

完整的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <glog/logging.h>
#include <iostream>
#include <mutex>
#include <thread>

struct Data {
  int num = 0;
  std::string str = "0";
};

// 数据
std::mutex user_data_mutex_;
Data user_data;

std::mutex user_data2_mutex_;
Data user_data2;

void TestSolveDeadLock() {
  // 死锁的解决
  LOG(ERROR) << "=======TestSolveDeadLock=======";

  // 第一个线程
  std::thread sub_thread1 = std::thread([] {
    std::lock(user_data_mutex_, user_data2_mutex_);
    std::lock_guard<std::mutex> lock1(user_data_mutex_, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(user_data2_mutex_, std::adopt_lock);
    std::this_thread::sleep_for(std::chrono::milliseconds(50));

    LOG(ERROR) << "sub_thread1 " << user_data.num << "," << user_data.str;
  });

  // 第二个线程
  std::thread sub_thread2 = std::thread([] {
    std::lock(user_data_mutex_, user_data2_mutex_);
    std::lock_guard<std::mutex> lock1(user_data_mutex_, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(user_data2_mutex_, std::adopt_lock);
    std::this_thread::sleep_for(std::chrono::milliseconds(50));

    LOG(ERROR) << "sub_thread2 " << user_data.num << "," << user_data.str;
  });

  sub_thread1.join();
  sub_thread2.join();
}

Contact

Feel free to contact me windmillyucong@163.com anytime for anything.


License

CC0