erebus

diff liberebus/src/threadpool.cc @ 26:c8a6fb04fefa

multithreadededit
author John Tsiombikas <nuclear@member.fsf.org>
date Sun, 01 Jun 2014 19:19:40 +0300
parents 6ef4b10fa468
children 53a98c148bf8
line diff
     1.1 --- a/liberebus/src/threadpool.cc	Sat May 31 06:21:09 2014 +0300
     1.2 +++ b/liberebus/src/threadpool.cc	Sun Jun 01 19:19:40 2014 +0300
     1.3 @@ -1,59 +1,128 @@
     1.4 -#include "threadpool.h"
     1.5 -
     1.6 -ThreadPool::ThreadPool(int num_threads)
     1.7 -{
     1.8 -	quit = false;
     1.9 -
    1.10 -	if(num_threads == -1) {
    1.11 -		num_threads = std::thread::hardware_concurrency();
    1.12 -	}
    1.13 -
    1.14 -	printf("creating thread pool with %d threads\n", num_threads);
    1.15 -
    1.16 -	thread = new std::thread[num_threads];
    1.17 -	for(int i=0; i<num_threads; i++) {
    1.18 -		thread[i] = std::thread(&ThreadPool::thread_func, this);
    1.19 -	}
    1.20 -	this->num_threads = num_threads;
    1.21 -}
    1.22 -
    1.23 -ThreadPool::~ThreadPool()
    1.24 -{
    1.25 -	quit = true;
    1.26 -	condvar.notify_all();
    1.27 -
    1.28 -	printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
    1.29 -	fflush(stdout);
    1.30 -	for(int i=0; i<num_threads; i++) {
    1.31 -		thread[i].join();
    1.32 -		putchar('.');
    1.33 -		fflush(stdout);
    1.34 -	}
    1.35 -	putchar('\n');
    1.36 -}
    1.37 -
    1.38 -void ThreadPool::add_work(std::function<void ()> func)
    1.39 -{
    1.40 -	std::unique_lock<std::mutex> lock(workq_mutex);
    1.41 -	workq.push_back(func);
    1.42 -}
    1.43 -
    1.44 -void ThreadPool::thread_func()
    1.45 -{
    1.46 -	std::unique_lock<std::mutex> lock(workq_mutex);
    1.47 -	for(;;) {
    1.48 -		if(quit) break;
    1.49 -
    1.50 -		condvar.wait(lock);
    1.51 -
    1.52 -		if(!quit && !workq.empty()) {
    1.53 -			std::function<void ()> work = workq.front();
    1.54 -			workq.pop_front();
    1.55 -			lock.unlock();
    1.56 -
    1.57 -			work();
    1.58 -
    1.59 -			lock.lock();
    1.60 -		}
    1.61 -	}
    1.62 -}
    1.63 +#include <algorithm>
    1.64 +#include <chrono>
    1.65 +#include "threadpool.h"
    1.66 +
    1.67 +using namespace std::chrono;
    1.68 +
    1.69 +ThreadPool::ThreadPool(int num_threads)
    1.70 +{
    1.71 +	quit = false;
    1.72 +	qsize = 0;
    1.73 +	nactive = 0;
    1.74 +
    1.75 +	if(num_threads == -1) {
    1.76 +		num_threads = std::thread::hardware_concurrency();
    1.77 +	}
    1.78 +
    1.79 +	printf("creating thread pool with %d threads\n", num_threads);
    1.80 +
    1.81 +	thread = new std::thread[num_threads];
    1.82 +	for(int i=0; i<num_threads; i++) {
    1.83 +		thread[i] = std::thread(&ThreadPool::thread_func, this);
    1.84 +	}
    1.85 +	this->num_threads = num_threads;
    1.86 +}
    1.87 +
    1.88 +ThreadPool::~ThreadPool()
    1.89 +{
    1.90 +	quit = true;
    1.91 +	workq_condvar.notify_all();
    1.92 +
    1.93 +	printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
    1.94 +	fflush(stdout);
    1.95 +	for(int i=0; i<num_threads; i++) {
    1.96 +		thread[i].join();
    1.97 +		putchar('.');
    1.98 +		fflush(stdout);
    1.99 +	}
   1.100 +	putchar('\n');
   1.101 +}
   1.102 +
   1.103 +void ThreadPool::add_work(std::function<void ()> func)
   1.104 +{
   1.105 +	add_work(func, std::function<void ()>{});
   1.106 +}
   1.107 +
   1.108 +void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
   1.109 +{
   1.110 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.111 +	workq.push_back(WorkItem{work_func, done_func});
   1.112 +	++qsize;
   1.113 +	workq_condvar.notify_all();
   1.114 +}
   1.115 +
   1.116 +int ThreadPool::queued() const
   1.117 +{
   1.118 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.119 +	return qsize;
   1.120 +}
   1.121 +
   1.122 +int ThreadPool::active() const
   1.123 +{
   1.124 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.125 +	return nactive;
   1.126 +}
   1.127 +
   1.128 +int ThreadPool::pending() const
   1.129 +{
   1.130 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.131 +	return nactive + qsize;
   1.132 +}
   1.133 +
   1.134 +long ThreadPool::wait()
   1.135 +{
   1.136 +	auto start_time = steady_clock::now();
   1.137 +
   1.138 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.139 +	done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
   1.140 +
   1.141 +	auto dur = steady_clock::now() - start_time;
   1.142 +	return duration_cast<milliseconds>(dur).count();
   1.143 +}
   1.144 +
   1.145 +long ThreadPool::wait(long timeout)
   1.146 +{
   1.147 +	auto start_time = steady_clock::now();
   1.148 +	duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
   1.149 +
   1.150 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.151 +	while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
   1.152 +		if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
   1.153 +			break;
   1.154 +		}
   1.155 +		dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
   1.156 +		timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
   1.157 +	}
   1.158 +
   1.159 +	/*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
   1.160 +			nactive, qsize, workq.empty() ? "true" : "false");*/
   1.161 +	return dur.count();
   1.162 +}
   1.163 +
   1.164 +void ThreadPool::thread_func()
   1.165 +{
   1.166 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.167 +	for(;;) {
   1.168 +		if(quit) break;
   1.169 +
   1.170 +		workq_condvar.wait(lock);
   1.171 +
   1.172 +		while(!quit && !workq.empty()) {
   1.173 +			WorkItem witem = workq.front();
   1.174 +			workq.pop_front();
   1.175 +			++nactive;
   1.176 +			--qsize;
   1.177 +			lock.unlock();
   1.178 +
   1.179 +			witem.work();
   1.180 +			if(witem.done) {
   1.181 +				witem.done();
   1.182 +			}
   1.183 +
   1.184 +			lock.lock();
   1.185 +			--nactive;
   1.186 +			done_condvar.notify_all();
   1.187 +		}
   1.188 +	}
   1.189 +}
   1.190 +