distray

annotate src/threadpool.cc @ 0:cf494adee646

distance field raytracer
author John Tsiombikas <nuclear@member.fsf.org>
date Fri, 25 Dec 2015 05:41:10 +0200
parents
children
rev   line source
nuclear@0 1 #include <algorithm>
nuclear@0 2 #include <chrono>
nuclear@0 3 #include "threadpool.h"
nuclear@0 4
nuclear@0 5 using namespace std::chrono;
nuclear@0 6
nuclear@0 7 ThreadPool::ThreadPool(int num_threads)
nuclear@0 8 {
nuclear@0 9 quit = false;
nuclear@0 10 qsize = 0;
nuclear@0 11 nactive = 0;
nuclear@0 12
nuclear@0 13 if(num_threads == -1) {
nuclear@0 14 num_threads = std::thread::hardware_concurrency();
nuclear@0 15 }
nuclear@0 16
nuclear@0 17 printf("creating thread pool with %d threads\n", num_threads);
nuclear@0 18
nuclear@0 19 thread = new std::thread[num_threads];
nuclear@0 20 for(int i=0; i<num_threads; i++) {
nuclear@0 21 thread[i] = std::thread(&ThreadPool::thread_func, this);
nuclear@0 22
nuclear@0 23 #ifdef _MSC_VER
nuclear@0 24 /* detach the thread to avoid having to join them in the destructor, which
nuclear@0 25 * causes a deadlock in msvc implementation when called after main returns
nuclear@0 26 */
nuclear@0 27 thread[i].detach();
nuclear@0 28 #endif
nuclear@0 29 }
nuclear@0 30 this->num_threads = num_threads;
nuclear@0 31 }
nuclear@0 32
nuclear@0 33 ThreadPool::~ThreadPool()
nuclear@0 34 {
nuclear@0 35 #ifdef _MSC_VER
nuclear@0 36 clear_work();
nuclear@0 37 #endif
nuclear@0 38
nuclear@0 39 quit = true;
nuclear@0 40 workq_condvar.notify_all();
nuclear@0 41
nuclear@0 42 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
nuclear@0 43 fflush(stdout);
nuclear@0 44 #ifndef _MSC_VER
nuclear@0 45 for(int i=0; i<num_threads; i++) {
nuclear@0 46 thread[i].join();
nuclear@0 47 putchar('.');
nuclear@0 48 fflush(stdout);
nuclear@0 49 }
nuclear@0 50 #else
nuclear@0 51 // spin until all threads are done...
nuclear@0 52 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 53 while(nactive > 0) {
nuclear@0 54 lock.unlock();
nuclear@0 55 std::this_thread::sleep_for(std::chrono::milliseconds(128));
nuclear@0 56 putchar('.');
nuclear@0 57 fflush(stdout);
nuclear@0 58 lock.lock();
nuclear@0 59 }
nuclear@0 60 #endif // _MSC_VER
nuclear@0 61
nuclear@0 62 putchar('\n');
nuclear@0 63 delete [] thread;
nuclear@0 64 }
nuclear@0 65
nuclear@0 66 void ThreadPool::add_work(std::function<void ()> func)
nuclear@0 67 {
nuclear@0 68 add_work(func, std::function<void ()>{});
nuclear@0 69 }
nuclear@0 70
nuclear@0 71 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
nuclear@0 72 {
nuclear@0 73 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 74 workq.push_back(WorkItem{work_func, done_func});
nuclear@0 75 ++qsize;
nuclear@0 76 workq_condvar.notify_all();
nuclear@0 77 }
nuclear@0 78
nuclear@0 79 void ThreadPool::clear_work()
nuclear@0 80 {
nuclear@0 81 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 82 workq.clear();
nuclear@0 83 qsize = 0;
nuclear@0 84 }
nuclear@0 85
nuclear@0 86 int ThreadPool::queued() const
nuclear@0 87 {
nuclear@0 88 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 89 return qsize;
nuclear@0 90 }
nuclear@0 91
nuclear@0 92 int ThreadPool::active() const
nuclear@0 93 {
nuclear@0 94 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 95 return nactive;
nuclear@0 96 }
nuclear@0 97
nuclear@0 98 int ThreadPool::pending() const
nuclear@0 99 {
nuclear@0 100 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 101 return nactive + qsize;
nuclear@0 102 }
nuclear@0 103
nuclear@0 104 long ThreadPool::wait()
nuclear@0 105 {
nuclear@0 106 auto start_time = steady_clock::now();
nuclear@0 107
nuclear@0 108 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 109 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
nuclear@0 110
nuclear@0 111 auto dur = steady_clock::now() - start_time;
nuclear@0 112 return duration_cast<milliseconds>(dur).count();
nuclear@0 113 }
nuclear@0 114
nuclear@0 115 long ThreadPool::wait(long timeout)
nuclear@0 116 {
nuclear@0 117 auto start_time = steady_clock::now();
nuclear@0 118 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
nuclear@0 119
nuclear@0 120 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 121 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
nuclear@0 122 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
nuclear@0 123 break;
nuclear@0 124 }
nuclear@0 125 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
nuclear@0 126 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
nuclear@0 127 }
nuclear@0 128
nuclear@0 129 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
nuclear@0 130 nactive, qsize, workq.empty() ? "true" : "false");*/
nuclear@0 131 return dur.count();
nuclear@0 132 }
nuclear@0 133
nuclear@0 134 void ThreadPool::thread_func()
nuclear@0 135 {
nuclear@0 136 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@0 137 for(;;) {
nuclear@0 138 if(quit) break;
nuclear@0 139
nuclear@0 140 workq_condvar.wait(lock);
nuclear@0 141
nuclear@0 142 while(!quit && !workq.empty()) {
nuclear@0 143 WorkItem witem = workq.front();
nuclear@0 144 workq.pop_front();
nuclear@0 145 ++nactive;
nuclear@0 146 --qsize;
nuclear@0 147 lock.unlock();
nuclear@0 148
nuclear@0 149 witem.work();
nuclear@0 150 if(witem.done) {
nuclear@0 151 witem.done();
nuclear@0 152 }
nuclear@0 153
nuclear@0 154 lock.lock();
nuclear@0 155 --nactive;
nuclear@0 156 done_condvar.notify_all();
nuclear@0 157 }
nuclear@0 158 }
nuclear@0 159 }
nuclear@0 160