erebus
diff liberebus/src/threadpool.cc @ 26:c8a6fb04fefa
multithreadededit
author | John Tsiombikas <nuclear@member.fsf.org> |
---|---|
date | Sun, 01 Jun 2014 19:19:40 +0300 |
parents | 6ef4b10fa468 |
children | 53a98c148bf8 |
line diff
1.1 --- a/liberebus/src/threadpool.cc Sat May 31 06:21:09 2014 +0300 1.2 +++ b/liberebus/src/threadpool.cc Sun Jun 01 19:19:40 2014 +0300 1.3 @@ -1,59 +1,128 @@ 1.4 -#include "threadpool.h" 1.5 - 1.6 -ThreadPool::ThreadPool(int num_threads) 1.7 -{ 1.8 - quit = false; 1.9 - 1.10 - if(num_threads == -1) { 1.11 - num_threads = std::thread::hardware_concurrency(); 1.12 - } 1.13 - 1.14 - printf("creating thread pool with %d threads\n", num_threads); 1.15 - 1.16 - thread = new std::thread[num_threads]; 1.17 - for(int i=0; i<num_threads; i++) { 1.18 - thread[i] = std::thread(&ThreadPool::thread_func, this); 1.19 - } 1.20 - this->num_threads = num_threads; 1.21 -} 1.22 - 1.23 -ThreadPool::~ThreadPool() 1.24 -{ 1.25 - quit = true; 1.26 - condvar.notify_all(); 1.27 - 1.28 - printf("ThreadPool: waiting for %d worker threads to stop ", num_threads); 1.29 - fflush(stdout); 1.30 - for(int i=0; i<num_threads; i++) { 1.31 - thread[i].join(); 1.32 - putchar('.'); 1.33 - fflush(stdout); 1.34 - } 1.35 - putchar('\n'); 1.36 -} 1.37 - 1.38 -void ThreadPool::add_work(std::function<void ()> func) 1.39 -{ 1.40 - std::unique_lock<std::mutex> lock(workq_mutex); 1.41 - workq.push_back(func); 1.42 -} 1.43 - 1.44 -void ThreadPool::thread_func() 1.45 -{ 1.46 - std::unique_lock<std::mutex> lock(workq_mutex); 1.47 - for(;;) { 1.48 - if(quit) break; 1.49 - 1.50 - condvar.wait(lock); 1.51 - 1.52 - if(!quit && !workq.empty()) { 1.53 - std::function<void ()> work = workq.front(); 1.54 - workq.pop_front(); 1.55 - lock.unlock(); 1.56 - 1.57 - work(); 1.58 - 1.59 - lock.lock(); 1.60 - } 1.61 - } 1.62 -} 1.63 +#include <algorithm> 1.64 +#include <chrono> 1.65 +#include "threadpool.h" 1.66 + 1.67 +using namespace std::chrono; 1.68 + 1.69 +ThreadPool::ThreadPool(int num_threads) 1.70 +{ 1.71 + quit = false; 1.72 + qsize = 0; 1.73 + nactive = 0; 1.74 + 1.75 + if(num_threads == -1) { 1.76 + num_threads = std::thread::hardware_concurrency(); 1.77 + } 1.78 + 1.79 + printf("creating thread pool with %d threads\n", num_threads); 1.80 + 1.81 + thread = new std::thread[num_threads]; 1.82 + for(int i=0; i<num_threads; i++) { 1.83 + thread[i] = std::thread(&ThreadPool::thread_func, this); 1.84 + } 1.85 + this->num_threads = num_threads; 1.86 +} 1.87 + 1.88 +ThreadPool::~ThreadPool() 1.89 +{ 1.90 + quit = true; 1.91 + workq_condvar.notify_all(); 1.92 + 1.93 + printf("ThreadPool: waiting for %d worker threads to stop ", num_threads); 1.94 + fflush(stdout); 1.95 + for(int i=0; i<num_threads; i++) { 1.96 + thread[i].join(); 1.97 + putchar('.'); 1.98 + fflush(stdout); 1.99 + } 1.100 + putchar('\n'); 1.101 +} 1.102 + 1.103 +void ThreadPool::add_work(std::function<void ()> func) 1.104 +{ 1.105 + add_work(func, std::function<void ()>{}); 1.106 +} 1.107 + 1.108 +void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func) 1.109 +{ 1.110 + std::unique_lock<std::mutex> lock(workq_mutex); 1.111 + workq.push_back(WorkItem{work_func, done_func}); 1.112 + ++qsize; 1.113 + workq_condvar.notify_all(); 1.114 +} 1.115 + 1.116 +int ThreadPool::queued() const 1.117 +{ 1.118 + std::unique_lock<std::mutex> lock(workq_mutex); 1.119 + return qsize; 1.120 +} 1.121 + 1.122 +int ThreadPool::active() const 1.123 +{ 1.124 + std::unique_lock<std::mutex> lock(workq_mutex); 1.125 + return nactive; 1.126 +} 1.127 + 1.128 +int ThreadPool::pending() const 1.129 +{ 1.130 + std::unique_lock<std::mutex> lock(workq_mutex); 1.131 + return nactive + qsize; 1.132 +} 1.133 + 1.134 +long ThreadPool::wait() 1.135 +{ 1.136 + auto start_time = steady_clock::now(); 1.137 + 1.138 + std::unique_lock<std::mutex> lock(workq_mutex); 1.139 + done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); }); 1.140 + 1.141 + auto dur = steady_clock::now() - start_time; 1.142 + return duration_cast<milliseconds>(dur).count(); 1.143 +} 1.144 + 1.145 +long ThreadPool::wait(long timeout) 1.146 +{ 1.147 + auto start_time = steady_clock::now(); 1.148 + duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L)); 1.149 + 1.150 + std::unique_lock<std::mutex> lock(workq_mutex); 1.151 + while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) { 1.152 + if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) { 1.153 + break; 1.154 + } 1.155 + dur = duration_cast<milliseconds>(steady_clock::now() - start_time); 1.156 + timeout_dur = milliseconds(std::max(timeout, 5L)) - dur; 1.157 + } 1.158 + 1.159 + /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout, 1.160 + nactive, qsize, workq.empty() ? "true" : "false");*/ 1.161 + return dur.count(); 1.162 +} 1.163 + 1.164 +void ThreadPool::thread_func() 1.165 +{ 1.166 + std::unique_lock<std::mutex> lock(workq_mutex); 1.167 + for(;;) { 1.168 + if(quit) break; 1.169 + 1.170 + workq_condvar.wait(lock); 1.171 + 1.172 + while(!quit && !workq.empty()) { 1.173 + WorkItem witem = workq.front(); 1.174 + workq.pop_front(); 1.175 + ++nactive; 1.176 + --qsize; 1.177 + lock.unlock(); 1.178 + 1.179 + witem.work(); 1.180 + if(witem.done) { 1.181 + witem.done(); 1.182 + } 1.183 + 1.184 + lock.lock(); 1.185 + --nactive; 1.186 + done_condvar.notify_all(); 1.187 + } 1.188 + } 1.189 +} 1.190 +