Contents

[C++]C++ 100行实现线程池

一个100行左右的简单线程池。用到了std::mutex和std::thread等新特性。

线程池模型

首先把每个函数抽象为一个任务(Task),任务的过程就是调用这个Task的run函数。
然后把线程池中的线程封装为一个线程类(Thread),一直等待调度器分配任务(空闲状态),如果有任务分配立即进入忙状态。等任务执行结束再次变为空闲状态。
最后是一个调度器类(TreadPool),包含任务队列(随时添加新任务),和一个包含了Thread的vector(线程池中的线程)。如果任务队列非空,调度器每次从中取出一个任务,然后轮询线程池,搜寻空闲线程并把这个任务交给线程。
模型如下图所示:

代码实现

下面的代码实现了上述模型。其中Task类通过睡眠一定的秒数模拟任务,可以看到T1先执行完毕(1秒完毕),T2和T3在之后同时完毕,说明调度非常成功。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//linux, g++ -std=c++14 -o t *.cpp -pthread
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unistd.h> 

class Task{
private:
    int no;
public:
    Task(int n){
        no = n;
    }
    //可以继承这个类重写该方法执行任务
    virtual void run(){
        sleep(no); //构造时决定执行几秒,模拟线程运行
        std::cout << no << "T\n";
    }

};

class Thread{
private:
    std::thread _thread;
    bool _isfree;
    Task *_task;
    std::mutex _locker;
public:
    //构造
    Thread() : _isfree(true), _task(nullptr){
        _thread = std::thread(&Thread::run, this);
        _thread.detach(); //放到后台, join是等待线程结束
    }
    //是否空闲
    bool isfree(){
        return _isfree;
    }
    //添加任务
    void add_task(Task *task){
        if(_isfree){
            _locker.lock();
            _task = task;
            _isfree = false;
            _locker.unlock();
        }
    }
    //如果有任务则执行任务,否则自旋
    void run(){
        while(true){
            if(_task){
                _locker.lock();
                _isfree = false;
                _task->run();
                _isfree = true;
                _task = nullptr;
                _locker.unlock();
            }
        }
    }

};

class ThreadPool{
private:
    std::queue<Task *> task_queue;
    std::vector<Thread *> _pool;
    std::mutex _locker;

public:
    //构造线程并后台执行,默认数量为10
    ThreadPool(int n = 10){
        while(n--){
            Thread *t = new Thread();
            _pool.push_back(t);
        }
        std::thread main_thread(&ThreadPool::run, this);
        main_thread.detach();
    }
    //释放线程池
    ~ThreadPool(){
        for(int i = 0;i < _pool.size(); ++i){
            delete _pool[i];
        }
    }
    //添加任务
    void add_task(Task *task){
        _locker.lock();
        task_queue.push(task);
        _locker.unlock();
    }
    //轮询
    void run(){
        while(true){
            _locker.lock();
            if(task_queue.empty()){
                _locker.unlock();
                continue;
            }
            // 寻找空闲线程执行任务
            for(int i = 0; i < _pool.size(); ++i){
                if(_pool[i]->isfree()){
                    _pool[i]->add_task(task_queue.front());
                    task_queue.pop();
                    break;
                }
            }
            _locker.unlock();
        }
    }

};

int main(){
    ThreadPool tp(2);
    
    Task t1(1);
    Task t2(3);
    Task t3(2);

    tp.add_task(&t1);
    tp.add_task(&t2);
    tp.add_task(&t3);
    
    sleep(4);   //等待调度器结束,不然会崩溃
    return 0;
}