作者:一只大喵咪1201
专栏:《Linux学习》
格言:你只管努力,剩下的交给时间!

线程池 | 单例模式

  • 一、 线程池
    • 1.1 Thread.hpp
    • 1.2 ThreadPool.hpp
    • 1.3 main.cpp
    • 1.4 RAII方式加锁
  • 二、 单例模式
    • 2.1 饿汉模式
    • 2.2 懒汉模式
  • 三、 总结

多线程部分的知识讲解到此就告一段落了,现在创建一个线程池来检验一下我们的学习成果。

一、 线程池

  • 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。
  • 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建与销毁线程的代价。
  • 线程池不仅能够保证内核的充分利用,还能防止过分调度。
  • 可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

每个线程在创建的时候会进行一系列系统调用,所以线程创建是有系统开销的。如果每需要一个线程再去创建,就会导致系统的性能下降等问题。

所以线程池就是维护着一些已经创建但是处于阻塞等待状态的线程,当有任务需要处理时,线程被唤醒并且执行对应的任务。

此时就避免了新建线程的系统开销,并且提高了响应效率。

1.1 Thread.hpp

这是一个小组件,在前面学习线程的时候本喵讲解过,现在将其拿过来并进行一定的改造:

#include #include #include #include const int name_size = 1024;class Thread{typedef std::function<void*(void*)> func_t;private:static void* start_routine(void* args){Thread* _this = static_cast<Thread*>(args);return _this->callback();}public://构造函数Thread(){//构建线程名字char namebuffer[name_size];snprintf(namebuffer,sizeof namebuffer,"thread-%d",_threadNum++);_name = namebuffer;}//启动线程void start(func_t func, void* args = nullptr){_func = func;_args = args;//创建线线程int n = pthread_create(&_tid,nullptr,start_routine,this);assert(n==0);(void)n;}//回调函数void* callback(){//调用新线程函数return _func(_args);}//获取线程名字std::string threadname(){return _name;}//线程等待void join(){int n = pthread_join(_tid,nullptr);assert(n==0);(void)n;}private:std::string _name;//线程名字pthread_t _tid;//线程tidvoid* _args;//传给线程函数的参数func_t _func;//线程函数static int _threadNum;//线程编号};int Thread::_threadNum = 1;//定义初始值是1

成员变量包括线程名字_name,线程_tid,给线程函数传递的参数_args,以及线程要执行的函数_func,还有线程编号_threadNum


  • _func使用了包装器,将返回值为void*,参数类型为void*的函数包装,并且重命名。
  • 在构造函数中不创建线程,仅仅是形成线程的名字,并且赋给_name,线程编号使用的是_threadNum,这是一个static变量,必须在类外进行定义初始化,每创建一个线程就将该值加一。
  • 成员函数start()有两个形参funcargs,在调用该成员函数的时候需要将新线程执行的函数以及参数传给start(),线程是在该接口中创建并开始执行的。

创建新线程:

  • 在使用pthread_create创建新线程时,传给新线程的执行函数是start_routine,每创建一个线程都会去执行这个函数。
  • 如果start_routine是一个普通成员函数,那么它就会隐藏存在第一个参数this指针,它的形参就成了(Thread* const this, void* args),而创建新线程的时候传递的函数必须只能有一个参数void* args
  • 所以使用static修饰成员函数start_routine,此时就没有了this指针,创建的新新线程就可以调用它了。
  • 由于创建新线程的时候是在类内创建的,所以将start_routine设置成私有。

回调:

  • 创建一个回调成员函数callback()start_routine去调用,在回调函数内部,再去调用传参时传入的真正要执行的函数_func
  • start_routine是静态成员函数,是没有this指针的,所以是无法直接调用普通成员函数和普通成员变量的,所以在创建新线程时,给start_routine传的形参void* args就是当前线程对象的this指针
  • start_routine中,通过this指针来调用回调函数callback,再在回调函数中调用_func

综上所诉,在调用start(func_t func, void* args)后新线程执行的函数就是传入的形参——函数指针。


经过测试,我们封装的创建新线程的类是没有问题的。

1.2 ThreadPool.hpp

在这个类中,将实现多个线程的创建和维护,和一个基于阻塞队列的生产者消费者模型。其中生产者就是生成任务的线程,而消费者就是所维护的好几个线程,阻塞队列和所有消费者共同组成线程池

#include #include #include #include #include "Thread.hpp"const int threadNum = 10;//前置声明template <class T>class ThreadPool;//线程属性template <class T>class ThreadData{public:ThreadPool<T>* threadpool;//线程池this指针std::string _threadname;//线程名字//构造函数ThreadData(ThreadPool<T>* tp, std::string name):threadpool(tp),_threadname(name){}};//线程池template <class T>class ThreadPool{private:static void* handerTask(void* args){ThreadData<T>* tpd = static_cast<ThreadData<T>*>(args);while(1){tpd->threadpool->lockQueue();//加锁while(tpd->threadpool->isQueueEmpty()){//任务队列为空,进行等待tpd->threadpool->threadWait();}T t = tpd->threadpool->pop();//获取任务到线程独立的栈结构中tpd->threadpool->unlockQueue();//解锁std::cout<<tpd->_threadname<<",接受了任务:"<<t.toTaskString()<<",并处理完成:"<< t() <<std::endl;//处理任务}delete tpd;return nullptr;}public://静态成员函数访问非静态成员接口bool isQueueEmpty() {return _task_queue.empty();}//判断任务队列是否为空void lockQueue() {pthread_mutex_lock(&_mutex);}//给任务队列加锁void unlockQueue() {pthread_mutex_unlock(&_mutex);}//给任务队列解锁void threadWait() {pthread_cond_wait(&_cond,&_mutex);}//将线程放入条件变量的等待队列中//获取任务T pop(){T t = _task_queue.front();_task_queue.pop();return t;}public:ThreadPool(const int& num = threadNum):_num(num){pthread_mutex_init(&_mutex,nullptr);//初始化互斥锁pthread_cond_init(&_cond,nullptr);//初始化条件变量//创建一批线程for(size_t i = 0; i < _num; ++i){_threads.push_back(new Thread());}}//所有线程启动void run(){for(const auto& t : _threads){//线程属性初始化ThreadData<T>* tpd = new ThreadData<T>(this,t->threadname());t->start(handerTask,tpd);std::cout<<t->threadname()<<" start..."<<std::endl;//显式已经启动的线程}}//推送任务void push(T& in){_mtx.lock();//加锁_task_queue.push(in);pthread_cond_signal(&_cond);_mtx.unlock();//解锁}~ThreadPool(){pthread_mutex_destroy(&_mutex);//摧毁互斥锁pthread_cond_destroy(&_cond);//摧毁条件变量//释放所有线程for(const auto& t : _threads){delete t;}}private:int _num;//维护的线程数量std::vector<Thread*> _threads;//多个线程std::queue<T> _task_queue;//任务队列pthread_mutex_t _mutex;//互斥锁pthread_cond_t _cond;//条件变量std::mutex _mtx;//生成任务时的互斥锁};

成员变量包含线程池的线程数量_num,管理线程的数据结构_threads,这是一个vector容器,存放任务的任务队列_task_queue,保证多线程互斥访问任务队列的互斥锁_mutex,以及让多线程同步的条件变量_cond,生成任务时使用的互斥锁使用的是C++11线程库提供的std::mutex

创建线程池:

  • 线程池构造函数的形参使用缺失值num,该值是确定线程池中维护的线程数量,用户也可以自己在构造的时候指定num
  • 在构造函数中,将要使用到的互斥锁_mutex条件变量_cond进行初始化,并且创建指定数量的Thread对象,将其地址放入到vector容器中进行管理。此时仅有Thread对象,新线程还没有被创建。
  • 在线程池的析构函数中将互斥锁和条件变量销毁,以及vector中的Thread对象也全部释放掉,因为是new出来的,需要主动归还资源。

创建一批线程:

  • 提供一个接口run(),通过该接口真正创建对应数量的新线程,并且开始执行,每成功创建一个且开始执行后,打印该线程开始运行的信息threadname() start run...
  • run()函数内调用的是前面Threadstart()方法,只需要让vector容器中的所有Thread对象调用该方法,所有的线程就会启动。
  • 所有线程在启动时执行的都是handerTask()函数,同Thread中一样,需要将该函数的this指针去掉,所以这是一个static成员函数,没有this指针。
  • 创建一个ThreadData结构体,用来存放线程属性,包括线程池的this指针ThreadPool* threadpool和当前启动线程的名字_threadname
  • 在创建线程时需要将线程池的this指针和当前线程的名字当作形参传给handerTask静态函数。

从任务队列中取任务:

  • handerTask中,线程池中的所有线程从任务队列_task_queue中取任务去执行,并且要按照一定顺序去访问,所以多线程之间是同步和互斥的关系。
  • 在访问任务队列的时候先加锁,如果任务队列为空则挂起等待,如果不为空则取走任务并处理任务。
  • 因为handerTask是一个静态成员函数,所以该函数无法直接访问非静态成员,必须通过this指针。
  • 又因为执行handerTaskThread类对象在执行,所以handerTask中不能直接访问ThreadPool中的私有成员。所以提供了公有的接口供handerTask来访问私有成员,进行加锁,解锁,条件判断,以及取任务等操作。
  • 从任务队列中获取任务后,应该在解锁之后进行任务处理。
  • 当线程从任务队列中获取到任务以后,本质是将任务队列中的任务获取到自己独立的栈结构中,所以此时对于任务的处理所有线程是相互独立的。
  • 如果处理任务放在解锁之前,那么所有线程只能先加锁,再获取任务并处理,再解锁,就成了串行的了。
  • 线程处理完任务后,将在堆区存放当前线程属性的ThreadData对象释放掉。

推送任务:

  • 任务队列中的任务是由主线程或者是生产者推送进来的,如果是多线程推送任务,同样会存在线程安全问题,所以推送任务也是互斥的,这里使用的是C++11线程库中的互斥锁std::mutex
  • 在推送任务到任务队列前加锁,推送完成后唤醒在条件变量_cond下等待的一个线程,再进行解锁。

1.3 main.cpp

main函数中,要做的就是创建线程池,将所有线程启动,然后推送相应的任务到线程池中。

Task.hpp贴图:

这个模板类在前面已经出现很多次了,本喵就不再详细讲解了,主要的功能就是构建任务,获取到任务的线程通过调用该类中的仿函数来执行相应的逻辑。

main.cpp:

int main(){srand((unsigned int)time(nullptr)^getpid()^0x11223344);//产生随机数种子std::unique_ptr<ThreadPool<CalTask>> tp(new ThreadPool<CalTask>);//智能指针管理线程池tp->run();//启动所有线程int x,y;char op;//每隔1秒向线程池中推送一个任务while(1){//生成任务x = rand()%10 + 1;y = rand()%10 + 1;op = oper[rand()%oper.size()];CalTask t(x,y,op,myath);//推送任务tp->push(t);std::cout<<"主线程推送任务推送任务:"<<t.toTaskString()<<std::endl;sleep(1);}return 0;}

main函数中,向线程池中推送的是计算任务,两个操作数以及进行的运算操作都是随机生成的,然后构建CalTask对象,并推送到线程池中。推送完成后打印推送的任务。

  • 在线程池中维护着3个线程,这三个线程在任务队列中没有任务的时候,均处于阻塞等待状态,是被挂起的。
  • 主线程每推送一个任务到线程池,就会有一个线程被唤醒取任务队列中获取并处理任务。
  • 3个线程按照一定的顺序从任务队列中获取任务并处理。
  • 根据上面代码可以看到,线程池是根本不知道推送到任务队列中的任务是什么。所维护的线程同样也不知道。
  • 具体是什么任务是由任务的推送方决定的,线程池只负责从任务队列中获取并处理任务

1.4 RAII方式加锁

LockGuard.hpp:

#include #include class Mutex{public:Mutex(pthread_mutex_t* lock_p = nullptr):_lock_p(lock_p){}void lock(){pthread_mutex_lock(_lock_p);//加锁}void unlock(){pthread_mutex_unlock(_lock_p);//解锁}private:pthread_mutex_t* _lock_p;};class LockGuard{public:LockGuard(pthread_mutex_t* lock_p):_mutex(lock_p){_mutex.lock();//构造函数内加锁}~LockGuard(){_mutex.unlock();//析构函数内解锁}private:Mutex _mutex;};

创建一个LockGuard类模仿C++11中的lock_guard,在构造函数中加锁,析构函数中解锁,将锁的生命周期和对象的生命周期绑定在一起。

  • 创建一个Mutex类,该类用pthread_mutex_t*类型的锁初始化,并且包括加锁和解锁两个接口。
  • LockGuard中包括Mutex这个类,在构造函数中的初始化列表中,使用外部传进来的pthread_mutex_t*类型的锁定义Mutex对象,在构造函数中进行加锁。
  • 在析构函数中进行解锁。

获取任务:

  • handerTask中,使用LockGuard对任务队列进行RAII方式的加锁。
  • 同样需要一个公共接口供handerTask获取线程池中的私有变量互斥锁_mtuex,以此来构造LockGuard对象。
  • 加一个代码块,来控制LockGuard对象的生命周期的起始和结束。

推送任务:

  • 推送任务的接口中,使用的是C++11线程库提供的std::lock_guard进行RAII方式的加锁和解锁。
  • 开始执行push函数的时候加锁,执行完毕后解锁。

使用RAII的加锁方式后,程序的运行结果和之前一样,没有发生改变。

二、 单例模式

在C++11中的特殊类设计中,本喵详细讲解过单例模式的原理以及设计,有兴趣的小伙伴可以去看看传送门,这里本喵就不再介绍了。

  • 我们在使用malloc以及new等函数时,系统并不会立刻给我们在物理内存中开辟相应的空间,只是将虚拟地址空间中startend指针的地址范围扩大。
  • 当第一次使用当开辟的动态空间时,会发生缺页中断,操作系统在页表中建立相应的映射关系,并且在物理内存中开辟对应的空间。

这是一种典型的延时加载模式,就是单例模式中的懒汉模式一样。试想,如果在使用malloc的时候就开辟真实的物理空间,如果有10个100个进程开辟空间,但是确不使用,此时就会浪费物理空间中的内存,甚至导致因为内存不足而无法调度其他线程。

上面仅是一个背景知识的补充,下面本喵来将前面实现的线程池改成单例模式。

2.1 饿汉模式

饿汉模式就是在执行main函数之间,将单例创建出来:

  • 在线程池的私有成员变量中加一个它本身的静态成员变量static ThreadPool _singleton,该成员在静态区,只能有一个,所以它就是单例。
  • 类的静态成员必须在类外进行定义初始化,所以在类外定义创建单例对象_singleton

  • 将拷贝构造函数私有化,只有定义静态单例对象_singleton的时候可以调用,其他位置无法调用,也就无法创建对象。
  • 为了防止单例对象被拷贝,将拷贝构造函数以及赋值运算符重载函数都使用delete禁掉。


提供一个获取单例对象的公共接口GetInstance(),该对象是一个静态成员函数。

  • 单例对象是一个私有的静态成员变量,所以在类外是无法直接访问的,除了通过接口就无法拿到这个单例对象去使用。
  • 如果GetInstance不是静态成员函数,是一个普通的成员函数,那么调用它时必须传this指针。但是此时相当于不存在单例对象,也就无法调用GetInstance
  • 而静态的GetInstance在调用时不用传this指针,以为它只属于类而不属于对象,而且静态成员函数可以直接访问类中的静态成员。所以通过GetInstance就可以直接获取到单例对象_singleton去使用。


main.cpp包含了Thread.hpp头文件,所以在预处理后,main()函数的前面就有定义创建单例对象的语句。

  • main函数中,使用单例的线程池对象都得通过静态成员函数GetInstance去获取,然后再执行和之前一样的操作。

运行结果和之前一样,本喵就不贴图了。

2.2 懒汉模式

懒汉模式讲究的就是一个延时加载,既然操作系统在很多方面都采用这种方式,说明这种方式非常重要,同样这里将线程池再改造成懒汉模式的单例对象:

增加静态成员变量,线程池本身对象的指针_singleton,增加一把静态的锁_singlock,用来维护单例对象的线程安全,如上图中红色框中所示。

静态成员变量必须在类外进行定义初始化:

  • 单例对象指针变量的定义初始化:

ThreadPool*中的ThreadPool虽然还没有实例化,但是并不妨碍给ThreadPool*这个指针赋值为空,就像void*虽然不知道void是什么类型,但是却可以给这个指针赋值。

  • 静态互斥锁的初始化:

单例对象只有一个,所以也只需要一个互斥锁来维护线程安全, 所以同样放在静态区上。

std::mutex表示互斥锁是标准库中的互斥锁类型,ThreadPool表示是在先线程池这个作用域中。


  • 在第一次使用单例对象的时候再在堆区new一个单例对象出来。
  • 为了维护单例对象的线程安全,所以在判断单例对象是否存在的时候,需要加锁。
  • 为了提高效率,单例对象被创建后就不再申请锁去判断,采样双检查加锁的方式。

其他内容,像构造函数,拷贝构造以及赋值运算符重载等处理和饿汉模式一样。


此时在main函数的红色框中第一次使用单例对象,所以在这里创建单例对象,在绿色框中以及之后使用单例对象的时候,仅仅是获取单例对象。


从运行结果上看,和之前的一样。

三、 总结

这篇文章中并没有新的内容,将前面学习的和线程有关的内容进行了一个应用。至此,Liux系统部分的学习就暂时告一段落,接下来就要开启网络的学习了。