#ifndef thread_poll_ #define thread_poll_ #include #include #include #include #include #include #include #include #include"../basic/nanxing_operator_check.h" #include"../basic/nanxing_basic_signal_linklist.h" //任务链表 //要求使用std::package作为任务传递的包装器 namespace nanxing { template //一个不带有任务窃取的简单可伸缩队列(明天实现伸缩) class basic_thread_pool { static_assert(NANXING_BASIC_OPERATOR_(T,fun),"The type is illigel"); //验证T是否是一个可调用对象 private: std::condition_variable task_empty; std::mutex queue_mutex; //任务队列锁 nanxing::basic_signal_linklist tasks; //任务队列 int Max_size=1; //线程的最多数量,默认由硬件数量决定 std::vector thread_group; //线程池中的线程 std::vector> control_thread; //控制线程的开闭,true的时候对应的线程处于开启状态 private: void thread_function(bool& done,bool& situation) //线程工作函数,done从外部传入用于控制线程数量,状态用于指示线程是否完结 { T* task; while(!done) { { //std::cout<<"a"< ulk(queue_mutex); task_empty.wait(ulk,[this]{return (bool(tasks.list_size()));}); //如果任务队列为空则阻塞 if(done) //这个是必须的,因为在操作过程中有可能会被其他的线程改变 { break; } task=(tasks.pop()->data); //从队列中获取函数 ulk.unlock(); } (*task)(); } } public: //不允许移动和复制因为受到同步原语的限制,同步原语是不允许移动和复制的 basic_thread_pool(basic_thread_pool&)=delete; basic_thread_pool(basic_thread_pool&& others)=delete; basic_thread_pool& operator=(basic_thread_pool others)=delete; basic_thread_pool& operator=(const basic_thread_pool& others)=delete; public: basic_thread_pool() { Max_size=std::thread::hardware_concurrency()*2; } int init_pool() noexcept { int count; for(int i=0;i(control_thread[0]); bool&data_tm_1=std::get<1>(control_thread[0]); std::thread fun; Loop: try{ fun=std::thread(&basic_thread_pool::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1)); } catch(...) { if(count<=5) { count++; goto Loop; } else { std::cerr<<"thread_build_error"< lk(queue_mutex); mid=tasks.push(std::move(data)); if(tasks.list_size()==1) { flag=1; } } if(flag==1) { task_empty.notify_all(); } //如果任务队列非空就随机启动一个阻塞线程 return mid; } void close() { for(int i=0;i(control_thread[i])=true; } } int build_more_thread(size_t more=1) //如果不给出具体参数就赋值为1 { Max_size+=more; int count; for(int i=0;i(control_thread[0]); bool&data_tm_1=std::get<1>(control_thread[0]); std::thread fun; Loop: try{ fun=std::thread(&basic_thread_pool::thread_function,this,std::ref(data_tm_0),std::ref(data_tm_1)); } catch(...) { if(count<=5) { count++; goto Loop; } else { std::cerr<<"thread_build_error"<(control_thread[size-1-i])=false; //线程,一种惰性关闭,如果线程处于阻塞的状态的时候会在下一轮运行完成后关闭 control_thread.pop_back(); //从尾部把线程管理弹出 thread_group.pop_back(); } } ~basic_thread_pool() { int i=thread_group.size(); if(i!=0) { for(i-1;i>=0;i--) { if(thread_group[i].joinable()) { std::cout< struct thread_group //构建线程组,每个线程组有自己的任务队列,任务使用双向链表的形式,当有需要的时候直接将整个链表全部移走 { }; template class middle_pool //一个带有任务窃取的线程池 { private: }; } #endif