thread_pool.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. #ifndef thread_poll_
  2. #define thread_poll_
  3. #include<thread>
  4. #include<mutex>
  5. #include<shared_mutex>
  6. #include<vector>
  7. #include<tuple>
  8. #include<chrono>
  9. #include<condition_variable>
  10. #include<future>
  11. #include"../basic/nanxing_operator_check.h"
  12. #include"../basic/nanxing_basic_signal_linklist.h" //任务链表
  13. //要求使用std::package作为任务传递的包装器
  14. namespace nanxing
  15. {
  16. template<typename T> //一个不带有任务窃取的简单可伸缩队列(明天实现伸缩)
  17. class basic_thread_pool
  18. {
  19. static_assert(NANXING_BASIC_OPERATOR_(T,fun),"The type is illigel"); //验证T是否是一个可调用对象
  20. private:
  21. std::condition_variable task_empty;
  22. std::mutex queue_mutex; //任务队列锁
  23. nanxing::basic_signal_linklist<T> tasks; //任务队列
  24. int Max_size=1; //线程的最多数量,默认由硬件数量决定
  25. std::vector<std::thread> thread_group; //线程池中的线程
  26. std::vector<std::tuple<bool,bool>> control_thread; //控制线程的开闭,true的时候对应的线程处于开启状态
  27. private:
  28. void thread_function(bool& done,bool& situation) //线程工作函数,done从外部传入用于控制线程数量,状态用于指示线程是否完结
  29. {
  30. T* task;
  31. while(!done)
  32. {
  33. {
  34. //std::cout<<"a"<<std::endl;
  35. std::unique_lock<std::mutex> ulk(queue_mutex);
  36. task_empty.wait(ulk,[this]{return (bool(tasks.list_size()));}); //如果任务队列为空则阻塞
  37. if(done) //这个是必须的,因为在操作过程中有可能会被其他的线程改变
  38. {
  39. break;
  40. }
  41. task=(tasks.pop()->data); //从队列中获取函数
  42. ulk.unlock();
  43. }
  44. (*task)();
  45. }
  46. }
  47. public:
  48. //不允许移动和复制因为受到同步原语的限制,同步原语是不允许移动和复制的
  49. basic_thread_pool(basic_thread_pool&)=delete;
  50. basic_thread_pool(basic_thread_pool&& others)=delete;
  51. basic_thread_pool& operator=(basic_thread_pool others)=delete;
  52. basic_thread_pool& operator=(const basic_thread_pool& others)=delete;
  53. public:
  54. basic_thread_pool()
  55. {
  56. Max_size=std::thread::hardware_concurrency()*2;
  57. }
  58. int init_pool() noexcept
  59. {
  60. int count;
  61. for(int i=0;i<Max_size;i++)
  62. {
  63. count=0;
  64. control_thread.push_back(std::make_tuple(false,true));
  65. bool& data_tm_0=std::get<0>(control_thread[0]);
  66. bool&data_tm_1=std::get<1>(control_thread[0]);
  67. std::thread fun;
  68. Loop:
  69. try{
  70. fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
  71. }
  72. catch(...)
  73. {
  74. if(count<=5)
  75. {
  76. count++;
  77. goto Loop;
  78. }
  79. else
  80. {
  81. std::cerr<<"thread_build_error"<<std::endl;
  82. control_thread.pop_back();
  83. return -1;
  84. }
  85. }
  86. thread_group.push_back(std::move(fun));
  87. }
  88. return 1;
  89. }
  90. int get_size()
  91. {
  92. return Max_size;
  93. }
  94. int get_task_size()
  95. {
  96. return tasks.list_size();
  97. }
  98. int insert(T&& data)
  99. {
  100. int flag=0;
  101. int mid;
  102. {
  103. std::unique_lock<std::mutex> lk(queue_mutex);
  104. mid=tasks.push(std::move(data));
  105. if(tasks.list_size()==1)
  106. {
  107. flag=1;
  108. }
  109. }
  110. if(flag==1)
  111. {
  112. task_empty.notify_all();
  113. } //如果任务队列非空就随机启动一个阻塞线程
  114. return mid;
  115. }
  116. void close()
  117. {
  118. for(int i=0;i<control_thread.size();i++)
  119. {
  120. std::get<0>(control_thread[i])=true;
  121. }
  122. }
  123. int build_more_thread(size_t more=1) //如果不给出具体参数就赋值为1
  124. {
  125. Max_size+=more;
  126. int count;
  127. for(int i=0;i<more;i++)
  128. {
  129. count=0;
  130. control_thread.push_back(std::make_tuple(false,true));
  131. bool& data_tm_0=std::get<0>(control_thread[0]);
  132. bool&data_tm_1=std::get<1>(control_thread[0]);
  133. std::thread fun;
  134. Loop:
  135. try{
  136. fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
  137. }
  138. catch(...)
  139. {
  140. if(count<=5)
  141. {
  142. count++;
  143. goto Loop;
  144. }
  145. else
  146. {
  147. std::cerr<<"thread_build_error"<<std::endl;
  148. control_thread.pop_back(); //如果连续插入失败直接将没有绑定线程的标志位弹出
  149. return -1;
  150. }
  151. }
  152. thread_group.push_back(std::move(fun));
  153. }
  154. return 1;
  155. }
  156. void kill_thread(int count=1)
  157. {
  158. int size=control_thread.size();
  159. for(int i=0;i<count;i++)
  160. {
  161. std::get<0>(control_thread[size-1-i])=false; //线程,一种惰性关闭,如果线程处于阻塞的状态的时候会在下一轮运行完成后关闭
  162. control_thread.pop_back(); //从尾部把线程管理弹出
  163. thread_group.pop_back();
  164. }
  165. }
  166. ~basic_thread_pool()
  167. {
  168. int i=thread_group.size();
  169. if(i!=0)
  170. {
  171. for(i-1;i>=0;i--)
  172. {
  173. if(thread_group[i].joinable())
  174. {
  175. std::cout<<i<<std::endl;
  176. thread_group[i].join();
  177. }
  178. }
  179. }
  180. }
  181. };
  182. template<typename T>
  183. struct thread_group //构建线程组,每个线程组有自己的任务队列,任务使用双向链表的形式,当有需要的时候直接将整个链表全部移走
  184. {
  185. };
  186. template<typename T>
  187. class middle_pool //一个带有任务窃取的线程池
  188. {
  189. private:
  190. };
  191. }
  192. #endif