distray

diff src/threadpool.cc @ 0:cf494adee646

distance field raytracer
author John Tsiombikas <nuclear@member.fsf.org>
date Fri, 25 Dec 2015 05:41:10 +0200
parents
children
line diff
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/threadpool.cc	Fri Dec 25 05:41:10 2015 +0200
     1.3 @@ -0,0 +1,160 @@
     1.4 +#include <algorithm>
     1.5 +#include <chrono>
     1.6 +#include "threadpool.h"
     1.7 +
     1.8 +using namespace std::chrono;
     1.9 +
    1.10 +ThreadPool::ThreadPool(int num_threads)
    1.11 +{
    1.12 +	quit = false;
    1.13 +	qsize = 0;
    1.14 +	nactive = 0;
    1.15 +
    1.16 +	if(num_threads == -1) {
    1.17 +		num_threads = std::thread::hardware_concurrency();
    1.18 +	}
    1.19 +
    1.20 +	printf("creating thread pool with %d threads\n", num_threads);
    1.21 +
    1.22 +	thread = new std::thread[num_threads];
    1.23 +	for(int i=0; i<num_threads; i++) {
    1.24 +		thread[i] = std::thread(&ThreadPool::thread_func, this);
    1.25 +
    1.26 +#ifdef _MSC_VER
    1.27 +		/* detach the thread to avoid having to join them in the destructor, which
    1.28 +		 * causes a deadlock in msvc implementation when called after main returns
    1.29 +		 */
    1.30 +		thread[i].detach();
    1.31 +#endif
    1.32 +	}
    1.33 +	this->num_threads = num_threads;
    1.34 +}
    1.35 +
    1.36 +ThreadPool::~ThreadPool()
    1.37 +{
    1.38 +#ifdef _MSC_VER
    1.39 +	clear_work();
    1.40 +#endif
    1.41 +
    1.42 +	quit = true;
    1.43 +	workq_condvar.notify_all();
    1.44 +
    1.45 +	printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
    1.46 +	fflush(stdout);
    1.47 +#ifndef _MSC_VER
    1.48 +	for(int i=0; i<num_threads; i++) {
    1.49 +		thread[i].join();
    1.50 +		putchar('.');
    1.51 +		fflush(stdout);
    1.52 +	}
    1.53 +#else
    1.54 +	// spin until all threads are done...
    1.55 +	std::unique_lock<std::mutex> lock(workq_mutex);
    1.56 +	while(nactive > 0) {
    1.57 +		lock.unlock();
    1.58 +		std::this_thread::sleep_for(std::chrono::milliseconds(128));
    1.59 +		putchar('.');
    1.60 +		fflush(stdout);
    1.61 +		lock.lock();
    1.62 +	}
    1.63 +#endif	// _MSC_VER
    1.64 +
    1.65 +	putchar('\n');
    1.66 +	delete [] thread;
    1.67 +}
    1.68 +
    1.69 +void ThreadPool::add_work(std::function<void ()> func)
    1.70 +{
    1.71 +	add_work(func, std::function<void ()>{});
    1.72 +}
    1.73 +
    1.74 +void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
    1.75 +{
    1.76 +	std::unique_lock<std::mutex> lock(workq_mutex);
    1.77 +	workq.push_back(WorkItem{work_func, done_func});
    1.78 +	++qsize;
    1.79 +	workq_condvar.notify_all();
    1.80 +}
    1.81 +
    1.82 +void ThreadPool::clear_work()
    1.83 +{
    1.84 +	std::unique_lock<std::mutex> lock(workq_mutex);
    1.85 +	workq.clear();
    1.86 +	qsize = 0;
    1.87 +}
    1.88 +
    1.89 +int ThreadPool::queued() const
    1.90 +{
    1.91 +	std::unique_lock<std::mutex> lock(workq_mutex);
    1.92 +	return qsize;
    1.93 +}
    1.94 +
    1.95 +int ThreadPool::active() const
    1.96 +{
    1.97 +	std::unique_lock<std::mutex> lock(workq_mutex);
    1.98 +	return nactive;
    1.99 +}
   1.100 +
   1.101 +int ThreadPool::pending() const
   1.102 +{
   1.103 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.104 +	return nactive + qsize;
   1.105 +}
   1.106 +
   1.107 +long ThreadPool::wait()
   1.108 +{
   1.109 +	auto start_time = steady_clock::now();
   1.110 +
   1.111 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.112 +	done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
   1.113 +
   1.114 +	auto dur = steady_clock::now() - start_time;
   1.115 +	return duration_cast<milliseconds>(dur).count();
   1.116 +}
   1.117 +
   1.118 +long ThreadPool::wait(long timeout)
   1.119 +{
   1.120 +	auto start_time = steady_clock::now();
   1.121 +	duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
   1.122 +
   1.123 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.124 +	while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
   1.125 +		if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
   1.126 +			break;
   1.127 +		}
   1.128 +		dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
   1.129 +		timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
   1.130 +	}
   1.131 +
   1.132 +	/*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
   1.133 +			nactive, qsize, workq.empty() ? "true" : "false");*/
   1.134 +	return dur.count();
   1.135 +}
   1.136 +
   1.137 +void ThreadPool::thread_func()
   1.138 +{
   1.139 +	std::unique_lock<std::mutex> lock(workq_mutex);
   1.140 +	for(;;) {
   1.141 +		if(quit) break;
   1.142 +
   1.143 +		workq_condvar.wait(lock);
   1.144 +
   1.145 +		while(!quit && !workq.empty()) {
   1.146 +			WorkItem witem = workq.front();
   1.147 +			workq.pop_front();
   1.148 +			++nactive;
   1.149 +			--qsize;
   1.150 +			lock.unlock();
   1.151 +
   1.152 +			witem.work();
   1.153 +			if(witem.done) {
   1.154 +				witem.done();
   1.155 +			}
   1.156 +
   1.157 +			lock.lock();
   1.158 +			--nactive;
   1.159 +			done_condvar.notify_all();
   1.160 +		}
   1.161 +	}
   1.162 +}
   1.163 +