distray
diff src/threadpool.cc @ 0:cf494adee646
distance field raytracer
author | John Tsiombikas <nuclear@member.fsf.org> |
---|---|
date | Fri, 25 Dec 2015 05:41:10 +0200 |
parents | |
children |
line diff
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/src/threadpool.cc Fri Dec 25 05:41:10 2015 +0200 1.3 @@ -0,0 +1,160 @@ 1.4 +#include <algorithm> 1.5 +#include <chrono> 1.6 +#include "threadpool.h" 1.7 + 1.8 +using namespace std::chrono; 1.9 + 1.10 +ThreadPool::ThreadPool(int num_threads) 1.11 +{ 1.12 + quit = false; 1.13 + qsize = 0; 1.14 + nactive = 0; 1.15 + 1.16 + if(num_threads == -1) { 1.17 + num_threads = std::thread::hardware_concurrency(); 1.18 + } 1.19 + 1.20 + printf("creating thread pool with %d threads\n", num_threads); 1.21 + 1.22 + thread = new std::thread[num_threads]; 1.23 + for(int i=0; i<num_threads; i++) { 1.24 + thread[i] = std::thread(&ThreadPool::thread_func, this); 1.25 + 1.26 +#ifdef _MSC_VER 1.27 + /* detach the thread to avoid having to join them in the destructor, which 1.28 + * causes a deadlock in msvc implementation when called after main returns 1.29 + */ 1.30 + thread[i].detach(); 1.31 +#endif 1.32 + } 1.33 + this->num_threads = num_threads; 1.34 +} 1.35 + 1.36 +ThreadPool::~ThreadPool() 1.37 +{ 1.38 +#ifdef _MSC_VER 1.39 + clear_work(); 1.40 +#endif 1.41 + 1.42 + quit = true; 1.43 + workq_condvar.notify_all(); 1.44 + 1.45 + printf("ThreadPool: waiting for %d worker threads to stop ", num_threads); 1.46 + fflush(stdout); 1.47 +#ifndef _MSC_VER 1.48 + for(int i=0; i<num_threads; i++) { 1.49 + thread[i].join(); 1.50 + putchar('.'); 1.51 + fflush(stdout); 1.52 + } 1.53 +#else 1.54 + // spin until all threads are done... 1.55 + std::unique_lock<std::mutex> lock(workq_mutex); 1.56 + while(nactive > 0) { 1.57 + lock.unlock(); 1.58 + std::this_thread::sleep_for(std::chrono::milliseconds(128)); 1.59 + putchar('.'); 1.60 + fflush(stdout); 1.61 + lock.lock(); 1.62 + } 1.63 +#endif // _MSC_VER 1.64 + 1.65 + putchar('\n'); 1.66 + delete [] thread; 1.67 +} 1.68 + 1.69 +void ThreadPool::add_work(std::function<void ()> func) 1.70 +{ 1.71 + add_work(func, std::function<void ()>{}); 1.72 +} 1.73 + 1.74 +void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func) 1.75 +{ 1.76 + std::unique_lock<std::mutex> lock(workq_mutex); 1.77 + workq.push_back(WorkItem{work_func, done_func}); 1.78 + ++qsize; 1.79 + workq_condvar.notify_all(); 1.80 +} 1.81 + 1.82 +void ThreadPool::clear_work() 1.83 +{ 1.84 + std::unique_lock<std::mutex> lock(workq_mutex); 1.85 + workq.clear(); 1.86 + qsize = 0; 1.87 +} 1.88 + 1.89 +int ThreadPool::queued() const 1.90 +{ 1.91 + std::unique_lock<std::mutex> lock(workq_mutex); 1.92 + return qsize; 1.93 +} 1.94 + 1.95 +int ThreadPool::active() const 1.96 +{ 1.97 + std::unique_lock<std::mutex> lock(workq_mutex); 1.98 + return nactive; 1.99 +} 1.100 + 1.101 +int ThreadPool::pending() const 1.102 +{ 1.103 + std::unique_lock<std::mutex> lock(workq_mutex); 1.104 + return nactive + qsize; 1.105 +} 1.106 + 1.107 +long ThreadPool::wait() 1.108 +{ 1.109 + auto start_time = steady_clock::now(); 1.110 + 1.111 + std::unique_lock<std::mutex> lock(workq_mutex); 1.112 + done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); }); 1.113 + 1.114 + auto dur = steady_clock::now() - start_time; 1.115 + return duration_cast<milliseconds>(dur).count(); 1.116 +} 1.117 + 1.118 +long ThreadPool::wait(long timeout) 1.119 +{ 1.120 + auto start_time = steady_clock::now(); 1.121 + duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L)); 1.122 + 1.123 + std::unique_lock<std::mutex> lock(workq_mutex); 1.124 + while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) { 1.125 + if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) { 1.126 + break; 1.127 + } 1.128 + dur = duration_cast<milliseconds>(steady_clock::now() - start_time); 1.129 + timeout_dur = milliseconds(std::max(timeout, 5L)) - dur; 1.130 + } 1.131 + 1.132 + /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout, 1.133 + nactive, qsize, workq.empty() ? "true" : "false");*/ 1.134 + return dur.count(); 1.135 +} 1.136 + 1.137 +void ThreadPool::thread_func() 1.138 +{ 1.139 + std::unique_lock<std::mutex> lock(workq_mutex); 1.140 + for(;;) { 1.141 + if(quit) break; 1.142 + 1.143 + workq_condvar.wait(lock); 1.144 + 1.145 + while(!quit && !workq.empty()) { 1.146 + WorkItem witem = workq.front(); 1.147 + workq.pop_front(); 1.148 + ++nactive; 1.149 + --qsize; 1.150 + lock.unlock(); 1.151 + 1.152 + witem.work(); 1.153 + if(witem.done) { 1.154 + witem.done(); 1.155 + } 1.156 + 1.157 + lock.lock(); 1.158 + --nactive; 1.159 + done_condvar.notify_all(); 1.160 + } 1.161 + } 1.162 +} 1.163 +