nuclear@26: #include nuclear@26: #include nuclear@26: #include "threadpool.h" nuclear@26: nuclear@26: using namespace std::chrono; nuclear@26: nuclear@26: ThreadPool::ThreadPool(int num_threads) nuclear@26: { nuclear@26: quit = false; nuclear@26: qsize = 0; nuclear@26: nactive = 0; nuclear@26: nuclear@26: if(num_threads == -1) { nuclear@26: num_threads = std::thread::hardware_concurrency(); nuclear@26: } nuclear@26: nuclear@26: printf("creating thread pool with %d threads\n", num_threads); nuclear@26: nuclear@26: thread = new std::thread[num_threads]; nuclear@26: for(int i=0; inum_threads = num_threads; nuclear@26: } nuclear@26: nuclear@26: ThreadPool::~ThreadPool() nuclear@26: { nuclear@31: #ifdef _MSC_VER nuclear@34: clear_work(); nuclear@31: #endif nuclear@31: nuclear@26: quit = true; nuclear@26: workq_condvar.notify_all(); nuclear@26: nuclear@26: printf("ThreadPool: waiting for %d worker threads to stop ", num_threads); nuclear@26: fflush(stdout); nuclear@31: #ifndef _MSC_VER nuclear@26: for(int i=0; i lock(workq_mutex); nuclear@31: while(nactive > 0) { nuclear@31: lock.unlock(); nuclear@31: std::this_thread::sleep_for(std::chrono::milliseconds(128)); nuclear@31: putchar('.'); nuclear@31: fflush(stdout); nuclear@31: lock.lock(); nuclear@31: } nuclear@31: #endif // _MSC_VER nuclear@31: nuclear@26: putchar('\n'); nuclear@31: delete [] thread; nuclear@26: } nuclear@26: nuclear@26: void ThreadPool::add_work(std::function func) nuclear@26: { nuclear@26: add_work(func, std::function{}); nuclear@26: } nuclear@26: nuclear@26: void ThreadPool::add_work(std::function work_func, std::function done_func) nuclear@26: { nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: workq.push_back(WorkItem{work_func, done_func}); nuclear@26: ++qsize; nuclear@26: workq_condvar.notify_all(); nuclear@26: } nuclear@26: nuclear@34: void ThreadPool::clear_work() nuclear@34: { nuclear@34: std::unique_lock lock(workq_mutex); nuclear@34: workq.clear(); nuclear@34: qsize = 0; nuclear@34: } nuclear@34: nuclear@26: int ThreadPool::queued() const nuclear@26: { nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: return qsize; nuclear@26: } nuclear@26: nuclear@26: int ThreadPool::active() const nuclear@26: { nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: return nactive; nuclear@26: } nuclear@26: nuclear@26: int ThreadPool::pending() const nuclear@26: { nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: return nactive + qsize; nuclear@26: } nuclear@26: nuclear@26: long ThreadPool::wait() nuclear@26: { nuclear@26: auto start_time = steady_clock::now(); nuclear@26: nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); }); nuclear@26: nuclear@26: auto dur = steady_clock::now() - start_time; nuclear@26: return duration_cast(dur).count(); nuclear@26: } nuclear@26: nuclear@26: long ThreadPool::wait(long timeout) nuclear@26: { nuclear@26: auto start_time = steady_clock::now(); nuclear@26: duration dur, timeout_dur(std::max(timeout, 5L)); nuclear@26: nuclear@26: std::unique_lock lock(workq_mutex); nuclear@26: while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) { nuclear@26: if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) { nuclear@26: break; nuclear@26: } nuclear@26: dur = duration_cast(steady_clock::now() - start_time); nuclear@26: timeout_dur = milliseconds(std::max(timeout, 5L)) - dur; nuclear@26: } nuclear@26: nuclear@26: /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout, nuclear@26: nactive, qsize, workq.empty() ? "true" : "false");*/ nuclear@26: return dur.count(); nuclear@26: } nuclear@26: nuclear@26: void ThreadPool::thread_func() nuclear@26: { nuclear@26: std::unique_lock lock(workq_mutex); nuclear@47: while(!quit) { nuclear@26: workq_condvar.wait(lock); nuclear@26: nuclear@26: while(!quit && !workq.empty()) { nuclear@26: WorkItem witem = workq.front(); nuclear@26: workq.pop_front(); nuclear@26: ++nactive; nuclear@26: --qsize; nuclear@26: lock.unlock(); nuclear@26: nuclear@26: witem.work(); nuclear@26: if(witem.done) { nuclear@26: witem.done(); nuclear@26: } nuclear@26: nuclear@26: lock.lock(); nuclear@26: --nactive; nuclear@26: done_condvar.notify_all(); nuclear@26: } nuclear@26: } nuclear@26: } nuclear@26: