线程池
Created 2021.02.08 by Cong Yu; Last modified: 2021.02.08-v1.0.1
Contact: windmillyucong@163.com
Copyleft! 2022 Cong Yu. Some rights reserved.
线程池
References
- https://wangpengcheng.github.io/2019/05/17/cplusplus_theadpool/
基本概念
线程池: 当进行并行的任务作业操作时,线程的建立与销毁的开销是阻碍性能进步的关键,因此线程池诞生。提前创建好多个线程,往线程池中提交任务,无限制循环等待队列,进行计算和操作。
开销计算:
1
2
3
4
5
6
7
T1: 线程创建时间
T2: 线程执行时间
T3: 线程销毁时间
我们不希望有的开销: T1和T3,
这部分的比例为 (T1+T3)/(T1+T2+T3),
所以T2越大,越划算,并且尽量减少T1和T3。
组成:
- 线程管理器: 创建、初始化线程,管理调度
- 工作线程:线程中等待并分配执行任务
- 任务接口:添加任务的接口
- 任务队列:存放未处理的任务
工作的四种情况:
- 空闲(休眠):没有任务执行,没有任务缓存
- 部分忙碌部分空闲:有任务在执行中,没有任务缓存
- 全部忙碌:有任务在执行,有任务缓存待执行
- 全部忙碌,激活备用线程:有任务在执行,且缓存任务超出队列阈值
code
实现ThreadPool,以及一个public的任务提交接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Define a thread pool.
*/
class ThreadPool {
public:
ThreadPool(unsigned int n = std::thread::hardware_concurrency());
~ThreadPool();
template <class F, class... Args>
auto CommitTask(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
bool Finished();
unsigned int GetProcessed();
private:
std::vector<std::thread> workers_;
std::deque<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable cv_task_;
unsigned int busy_;
std::atomic_uint processed_;
bool shutdown_;
void ThreadProcess();
};
template <class F, class... Args>
auto ThreadPool::CommitTask(F &&f, Args &&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// don't allow enqueueing after stopping the pool
if (shutdown_)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks_.emplace_back([task]() { (*task)(); });
}
cv_task_.notify_one();
return res;
}
其中:
- std::vector<std::thread> workers_; 即 工作线程们
- std::deque<std::function<void()» tasks_; 即 任务队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ThreadPool::ThreadPool(unsigned int n) : busy_(), processed_(), shutdown_() {
for (unsigned int i = 0; i < n; ++i) {
workers_.emplace_back(std::bind(&ThreadPool::ThreadProcess, this));
}
}
ThreadPool::~ThreadPool() {
std::unique_lock<std::mutex> latch(queue_mutex_);
shutdown_ = true;
latch.unlock();
cv_task_.notify_all();
for (auto &worker : workers_)
worker.join();
}
void ThreadPool::ThreadProcess() {
while (true) {
std::unique_lock<std::mutex> latch(queue_mutex_);
cv_task_.wait(latch, [this]() { return shutdown_ || !tasks_.empty(); });
if (shutdown_) {
break;
}
if (!tasks_.empty()) {
++busy_;
auto task = tasks_.front();
tasks_.pop_front();
latch.unlock();
task();
++processed_;
latch.lock();
--busy_;
}
}}
bool ThreadPool::Finished() {
std::unique_lock<std::mutex> lock(queue_mutex_);
return tasks_.empty() && (busy_ == 0);
}
unsigned int ThreadPool::GetProcessed() { return processed_; }
Contact
Feel free to contact me windmillyucong@163.com anytime for anything.