123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- #ifndef thread_poll_
- #define thread_poll_
- #include<thread>
- #include<mutex>
- #include<shared_mutex>
- #include<vector>
- #include<tuple>
- #include<chrono>
- #include<condition_variable>
- #include<future>
- #include"../basic/nanxing_operator_check.h"
- #include"../basic/nanxing_basic_signal_linklist.h" //任务链表
- //要求使用std::package作为任务传递的包装器
- namespace nanxing
- {
- template<typename T> //一个不带有任务窃取的简单可伸缩队列(明天实现伸缩)
- class basic_thread_pool
- {
- static_assert(NANXING_BASIC_OPERATOR_(T,fun),"The type is illigel"); //验证T是否是一个可调用对象
- private:
- std::condition_variable task_empty;
- std::mutex queue_mutex; //任务队列锁
- nanxing::basic_signal_linklist<T> tasks; //任务队列
- int Max_size=1; //线程的最多数量,默认由硬件数量决定
- std::vector<std::thread> thread_group; //线程池中的线程
- std::vector<std::tuple<bool,bool>> control_thread; //控制线程的开闭,true的时候对应的线程处于开启状态
-
- private:
- void thread_function(bool& done,bool& situation) //线程工作函数,done从外部传入用于控制线程数量,状态用于指示线程是否完结
- {
- T* task;
- while(!done)
- {
- {
- //std::cout<<"a"<<std::endl;
- std::unique_lock<std::mutex> ulk(queue_mutex);
- task_empty.wait(ulk,[this]{return (bool(tasks.list_size()));}); //如果任务队列为空则阻塞
- if(done) //这个是必须的,因为在操作过程中有可能会被其他的线程改变
- {
- break;
- }
- task=(tasks.pop()->data); //从队列中获取函数
- ulk.unlock();
- }
- (*task)();
-
- }
- }
- public:
- //不允许移动和复制因为受到同步原语的限制,同步原语是不允许移动和复制的
- basic_thread_pool(basic_thread_pool&)=delete;
- basic_thread_pool(basic_thread_pool&& others)=delete;
- basic_thread_pool& operator=(basic_thread_pool others)=delete;
- basic_thread_pool& operator=(const basic_thread_pool& others)=delete;
- public:
- basic_thread_pool()
- {
- Max_size=std::thread::hardware_concurrency()*2;
- }
- int init_pool() noexcept
- {
- int count;
- for(int i=0;i<Max_size;i++)
- {
- count=0;
- control_thread.push_back(std::make_tuple(false,true));
- bool& data_tm_0=std::get<0>(control_thread[0]);
- bool&data_tm_1=std::get<1>(control_thread[0]);
- std::thread fun;
- Loop:
-
- try{
- fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
-
- }
- catch(...)
- {
- if(count<=5)
- {
- count++;
- goto Loop;
- }
- else
- {
- std::cerr<<"thread_build_error"<<std::endl;
- control_thread.pop_back();
-
- return -1;
- }
- }
- thread_group.push_back(std::move(fun));
- }
- return 1;
- }
- int get_size()
- {
- return Max_size;
- }
- int get_task_size()
- {
- return tasks.list_size();
- }
- int insert(T&& data)
- {
- int flag=0;
- int mid;
- {
- std::unique_lock<std::mutex> lk(queue_mutex);
- mid=tasks.push(std::move(data));
- if(tasks.list_size()==1)
- {
- flag=1;
-
- }
- }
- if(flag==1)
- {
- task_empty.notify_all();
- } //如果任务队列非空就随机启动一个阻塞线程
- return mid;
- }
- void close()
- {
- for(int i=0;i<control_thread.size();i++)
- {
- std::get<0>(control_thread[i])=true;
- }
- }
- int build_more_thread(size_t more=1) //如果不给出具体参数就赋值为1
- {
- Max_size+=more;
- int count;
- for(int i=0;i<more;i++)
- {
- count=0;
- control_thread.push_back(std::make_tuple(false,true));
- bool& data_tm_0=std::get<0>(control_thread[0]);
- bool&data_tm_1=std::get<1>(control_thread[0]);
- std::thread fun;
- Loop:
- try{
- fun=std::thread(&basic_thread_pool<T>::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1));
-
- }
- catch(...)
- {
- if(count<=5)
- {
- count++;
- goto Loop;
- }
- else
- {
- std::cerr<<"thread_build_error"<<std::endl;
- control_thread.pop_back(); //如果连续插入失败直接将没有绑定线程的标志位弹出
- return -1;
- }
- }
- thread_group.push_back(std::move(fun));
- }
- return 1;
- }
- void kill_thread(int count=1)
- {
- int size=control_thread.size();
-
- for(int i=0;i<count;i++)
- {
- std::get<0>(control_thread[size-1-i])=false; //线程,一种惰性关闭,如果线程处于阻塞的状态的时候会在下一轮运行完成后关闭
- control_thread.pop_back(); //从尾部把线程管理弹出
- thread_group.pop_back();
- }
- }
- ~basic_thread_pool()
- {
- int i=thread_group.size();
- if(i!=0)
- {
- for(i-1;i>=0;i--)
- {
- if(thread_group[i].joinable())
- {
- std::cout<<i<<std::endl;
- thread_group[i].join();
-
- }
- }
- }
- }
- };
- template<typename T>
- struct thread_group //构建线程组,每个线程组有自己的任务队列,任务使用双向链表的形式,当有需要的时候直接将整个链表全部移走
- {
- };
- template<typename T>
- class middle_pool //一个带有任务窃取的线程池
- {
- private:
- };
- }
- #endif
|