thread_pool.h 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #ifndef thread_poll_
  2. #define thread_poll_
  3. #include<thread>
  4. #include<mutex>
  5. #include<shared_mutex>
  6. #include<vector>
  7. #include<tuple>
  8. #include<chrono>
  9. #include<condition_variable>
  10. #include<future>
  11. #include"../basic/nanxing_operator_check.h"
  12. #include"../basic/nanxing_basic_signal_linklist.h" //任务链表
  13. namespace nanxing
  14. {
  15. template<typename T> //一个不带有任务窃取的简单可伸缩队列(明天实现伸缩)
  16. class basic_thread_pool
  17. {
  18. static_assert(NANXING_BASIC_OPERATOR_(T,fun),"The type is illigel"); //验证T是否是一个可调用对象
  19. private:
  20. std::condition_variable task_empty;
  21. std::mutex queue_mutex; //任务队列锁
  22. nanxing::basic_signal_linklist<T> tasks; //任务队列
  23. int Max_size=1; //线程的最多数量,默认由硬件数量决定
  24. std::vector<std::thread> thread_group; //线程池中的线程
  25. std::vector<std::tuple<bool,bool>> control_thread; //控制线程的开闭,true的时候对应的线程处于开启状态
  26. private:
  27. void thread_function(bool& done,bool& situation) //线程工作函数,done从外部传入用于控制线程数量,状态用于指示线程是否完结
  28. {
  29. T* task;
  30. while(!done)
  31. {
  32. {
  33. //std::cout<<"a"<<std::endl;
  34. std::unique_lock<std::mutex> ulk(queue_mutex);
  35. task_empty.wait(ulk,[&]{return (bool(tasks.list_size()));}); //如果任务队列为空则阻塞
  36. if(done)
  37. {
  38. break;
  39. }
  40. task=(tasks.pop()->data); //从队列中获取函数
  41. ulk.unlock();
  42. }
  43. (*task)();
  44. }
  45. }
  46. public:
  47. //不允许移动和复制因为受到同步原语的限制,同步原语是不允许移动和复制的
  48. basic_thread_pool(basic_thread_pool&)=delete;
  49. basic_thread_pool(basic_thread_pool&& others)=delete;
  50. basic_thread_pool& operator=(basic_thread_pool others)=delete;
  51. basic_thread_pool& operator=(const basic_thread_pool& others)=delete;
  52. public:
  53. basic_thread_pool()
  54. {
  55. Max_size=std::thread::hardware_concurrency()*2;
  56. }
  57. int init_pool() noexcept
  58. {
  59. int count;
  60. for(int i=0;i<Max_size;i++)
  61. {
  62. count=0;
  63. control_thread.push_back(std::make_tuple(false,true));
  64. bool& data_tm_0=std::get<0>(control_thread[0]);
  65. bool&data_tm_1=std::get<1>(control_thread[0]);
  66. std::thread fun;
  67. Loop:
  68. try{
  69. fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
  70. }
  71. catch(...)
  72. {
  73. if(count<=5)
  74. {
  75. count++;
  76. goto Loop;
  77. }
  78. else
  79. {
  80. std::cerr<<"thread_build_error"<<std::endl;
  81. control_thread.pop_back();
  82. return -1;
  83. }
  84. }
  85. thread_group.push_back(std::move(fun));
  86. }
  87. return 1;
  88. }
  89. int get_size()
  90. {
  91. return Max_size;
  92. }
  93. int get_task_size()
  94. {
  95. return tasks.list_size();
  96. }
  97. int insert(T&& data)
  98. {
  99. int flag=0;
  100. int mid;
  101. {
  102. std::unique_lock<std::mutex> lk(queue_mutex);
  103. mid=tasks.push(std::move(data));
  104. if(tasks.list_size()==1)
  105. {
  106. flag=1;
  107. }
  108. }
  109. if(flag==1)
  110. {
  111. task_empty.notify_all();
  112. } //如果任务队列非空就随机启动一个阻塞线程
  113. return mid;
  114. }
  115. void close()
  116. {
  117. for(int i=0;i<control_thread.size();i++)
  118. {
  119. std::get<0>(control_thread[i])=true;
  120. }
  121. }
  122. int build_more_thread(size_t more=1) //如果不给出具体参数就赋值为1
  123. {
  124. Max_size+=more;
  125. int count;
  126. for(int i=0;i<more;i++)
  127. {
  128. count=0;
  129. control_thread.push_back(std::make_tuple(false,true));
  130. bool& data_tm_0=std::get<0>(control_thread[0]);
  131. bool&data_tm_1=std::get<1>(control_thread[0]);
  132. std::thread fun;
  133. Loop:
  134. try{
  135. fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
  136. }
  137. catch(...)
  138. {
  139. if(count<=5)
  140. {
  141. count++;
  142. goto Loop;
  143. }
  144. else
  145. {
  146. std::cerr<<"thread_build_error"<<std::endl;
  147. control_thread.pop_back(); //如果连续插入失败直接将没有绑定线程的标志位弹出
  148. return -1;
  149. }
  150. }
  151. thread_group.push_back(std::move(fun));
  152. }
  153. return 1;
  154. }
  155. void kill_thread(int count=1)
  156. {
  157. int size=control_thread.size();
  158. for(int i=0;i<count;i++)
  159. {
  160. std::get<0>(control_thread[size-1-i])=false; //线程,一种惰性关闭,如果线程处于阻塞的状态的时候会在下一轮运行完成后关闭
  161. control_thread.pop_back(); //从尾部把线程管理弹出
  162. thread_group.pop_back();
  163. }
  164. }
  165. ~basic_thread_pool()
  166. {
  167. int i=thread_group.size();
  168. if(i!=0)
  169. {
  170. for(i-1;i>=0;i--)
  171. {
  172. if(thread_group[i].joinable())
  173. {
  174. std::cout<<i<<std::endl;
  175. thread_group[i].join();
  176. }
  177. }
  178. }
  179. }
  180. };
  181. template<typename T>
  182. struct thread_group //构建线程组,每个线程组有自己的任务队列,任务使用双向链表的形式,当有需要的时候直接将整个链表全部移走
  183. {
  184. };
  185. template<typename T>
  186. class middle_pool //一个带有任务窃取的线程池
  187. {
  188. private:
  189. };
  190. }
  191. #endif