一个100行左右的简单线程池。用到了std::mutex和std::thread等新特性。
线程池模型
首先把每个函数抽象为一个任务(Task),任务的过程就是调用这个Task的run函数。
然后把线程池中的线程封装为一个线程类(Thread),一直等待调度器分配任务(空闲状态),如果有任务分配立即进入忙状态。等任务执行结束再次变为空闲状态。
最后是一个调度器类(TreadPool),包含任务队列(随时添加新任务),和一个包含了Thread的vector(线程池中的线程)。如果任务队列非空,调度器每次从中取出一个任务,然后轮询线程池,搜寻空闲线程并把这个任务交给线程。
模型如下图所示:
代码实现
下面的代码实现了上述模型。其中Task类通过睡眠一定的秒数模拟任务,可以看到T1先执行完毕(1秒完毕),T2和T3在之后同时完毕,说明调度非常成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
//linux, g++ -std=c++14 -o t *.cpp -pthread
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unistd.h>
class Task{
private:
int no;
public:
Task(int n){
no = n;
}
//可以继承这个类重写该方法执行任务
virtual void run(){
sleep(no); //构造时决定执行几秒,模拟线程运行
std::cout << no << "T\n";
}
};
class Thread{
private:
std::thread _thread;
bool _isfree;
Task *_task;
std::mutex _locker;
public:
//构造
Thread() : _isfree(true), _task(nullptr){
_thread = std::thread(&Thread::run, this);
_thread.detach(); //放到后台, join是等待线程结束
}
//是否空闲
bool isfree(){
return _isfree;
}
//添加任务
void add_task(Task *task){
if(_isfree){
_locker.lock();
_task = task;
_isfree = false;
_locker.unlock();
}
}
//如果有任务则执行任务,否则自旋
void run(){
while(true){
if(_task){
_locker.lock();
_isfree = false;
_task->run();
_isfree = true;
_task = nullptr;
_locker.unlock();
}
}
}
};
class ThreadPool{
private:
std::queue<Task *> task_queue;
std::vector<Thread *> _pool;
std::mutex _locker;
public:
//构造线程并后台执行,默认数量为10
ThreadPool(int n = 10){
while(n--){
Thread *t = new Thread();
_pool.push_back(t);
}
std::thread main_thread(&ThreadPool::run, this);
main_thread.detach();
}
//释放线程池
~ThreadPool(){
for(int i = 0;i < _pool.size(); ++i){
delete _pool[i];
}
}
//添加任务
void add_task(Task *task){
_locker.lock();
task_queue.push(task);
_locker.unlock();
}
//轮询
void run(){
while(true){
_locker.lock();
if(task_queue.empty()){
_locker.unlock();
continue;
}
// 寻找空闲线程执行任务
for(int i = 0; i < _pool.size(); ++i){
if(_pool[i]->isfree()){
_pool[i]->add_task(task_queue.front());
task_queue.pop();
break;
}
}
_locker.unlock();
}
}
};
int main(){
ThreadPool tp(2);
Task t1(1);
Task t2(3);
Task t3(2);
tp.add_task(&t1);
tp.add_task(&t2);
tp.add_task(&t3);
sleep(4); //等待调度器结束,不然会崩溃
return 0;
}
|