erebus

annotate liberebus/src/threadpool.cc @ 31:53a98c148bf8

- introduced SurfaceGeometry to carry all the geometric information input to BRDF sampling and evaluation functions. - made Reflectance keep an (optional) pointer to its material - simplified PhongRefl::sample_dir, with the help of SurfaceGeometry - worked around microsoft's broken std::thread implementation's deadlock on join
author John Tsiombikas <nuclear@member.fsf.org>
date Sat, 07 Jun 2014 09:14:17 +0300
parents c8a6fb04fefa
children b1fc96c71bcc
rev   line source
nuclear@26 1 #include <algorithm>
nuclear@26 2 #include <chrono>
nuclear@26 3 #include "threadpool.h"
nuclear@26 4
nuclear@26 5 using namespace std::chrono;
nuclear@26 6
nuclear@26 7 ThreadPool::ThreadPool(int num_threads)
nuclear@26 8 {
nuclear@26 9 quit = false;
nuclear@26 10 qsize = 0;
nuclear@26 11 nactive = 0;
nuclear@26 12
nuclear@26 13 if(num_threads == -1) {
nuclear@26 14 num_threads = std::thread::hardware_concurrency();
nuclear@26 15 }
nuclear@26 16
nuclear@26 17 printf("creating thread pool with %d threads\n", num_threads);
nuclear@26 18
nuclear@26 19 thread = new std::thread[num_threads];
nuclear@26 20 for(int i=0; i<num_threads; i++) {
nuclear@26 21 thread[i] = std::thread(&ThreadPool::thread_func, this);
nuclear@31 22
nuclear@31 23 #ifdef _MSC_VER
nuclear@31 24 /* detach the thread to avoid having to join them in the destructor, which
nuclear@31 25 * causes a deadlock in msvc implementation when called after main returns
nuclear@31 26 */
nuclear@31 27 thread[i].detach();
nuclear@31 28 #endif
nuclear@26 29 }
nuclear@26 30 this->num_threads = num_threads;
nuclear@26 31 }
nuclear@26 32
nuclear@26 33 ThreadPool::~ThreadPool()
nuclear@26 34 {
nuclear@31 35 #ifdef _MSC_VER
nuclear@31 36 workq_mutex.lock();
nuclear@31 37 workq.clear();
nuclear@31 38 qsize = 0;
nuclear@31 39 workq_mutex.unlock();
nuclear@31 40 #endif
nuclear@31 41
nuclear@26 42 quit = true;
nuclear@26 43 workq_condvar.notify_all();
nuclear@26 44
nuclear@26 45 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
nuclear@26 46 fflush(stdout);
nuclear@31 47 #ifndef _MSC_VER
nuclear@26 48 for(int i=0; i<num_threads; i++) {
nuclear@26 49 thread[i].join();
nuclear@26 50 putchar('.');
nuclear@26 51 fflush(stdout);
nuclear@26 52 }
nuclear@31 53
nuclear@31 54 #else
nuclear@31 55 // spin until all threads are done...
nuclear@31 56 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@31 57 while(nactive > 0) {
nuclear@31 58 lock.unlock();
nuclear@31 59 std::this_thread::sleep_for(std::chrono::milliseconds(128));
nuclear@31 60 putchar('.');
nuclear@31 61 fflush(stdout);
nuclear@31 62 lock.lock();
nuclear@31 63 }
nuclear@31 64 #endif // _MSC_VER
nuclear@31 65
nuclear@26 66 putchar('\n');
nuclear@31 67 delete [] thread;
nuclear@26 68 }
nuclear@26 69
nuclear@26 70 void ThreadPool::add_work(std::function<void ()> func)
nuclear@26 71 {
nuclear@26 72 add_work(func, std::function<void ()>{});
nuclear@26 73 }
nuclear@26 74
nuclear@26 75 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
nuclear@26 76 {
nuclear@26 77 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 78 workq.push_back(WorkItem{work_func, done_func});
nuclear@26 79 ++qsize;
nuclear@26 80 workq_condvar.notify_all();
nuclear@26 81 }
nuclear@26 82
nuclear@26 83 int ThreadPool::queued() const
nuclear@26 84 {
nuclear@26 85 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 86 return qsize;
nuclear@26 87 }
nuclear@26 88
nuclear@26 89 int ThreadPool::active() const
nuclear@26 90 {
nuclear@26 91 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 92 return nactive;
nuclear@26 93 }
nuclear@26 94
nuclear@26 95 int ThreadPool::pending() const
nuclear@26 96 {
nuclear@26 97 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 98 return nactive + qsize;
nuclear@26 99 }
nuclear@26 100
nuclear@26 101 long ThreadPool::wait()
nuclear@26 102 {
nuclear@26 103 auto start_time = steady_clock::now();
nuclear@26 104
nuclear@26 105 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 106 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
nuclear@26 107
nuclear@26 108 auto dur = steady_clock::now() - start_time;
nuclear@26 109 return duration_cast<milliseconds>(dur).count();
nuclear@26 110 }
nuclear@26 111
nuclear@26 112 long ThreadPool::wait(long timeout)
nuclear@26 113 {
nuclear@26 114 auto start_time = steady_clock::now();
nuclear@26 115 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
nuclear@26 116
nuclear@26 117 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 118 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
nuclear@26 119 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
nuclear@26 120 break;
nuclear@26 121 }
nuclear@26 122 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
nuclear@26 123 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
nuclear@26 124 }
nuclear@26 125
nuclear@26 126 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
nuclear@26 127 nactive, qsize, workq.empty() ? "true" : "false");*/
nuclear@26 128 return dur.count();
nuclear@26 129 }
nuclear@26 130
nuclear@26 131 void ThreadPool::thread_func()
nuclear@26 132 {
nuclear@26 133 std::unique_lock<std::mutex> lock(workq_mutex);
nuclear@26 134 for(;;) {
nuclear@26 135 if(quit) break;
nuclear@26 136
nuclear@26 137 workq_condvar.wait(lock);
nuclear@26 138
nuclear@26 139 while(!quit && !workq.empty()) {
nuclear@26 140 WorkItem witem = workq.front();
nuclear@26 141 workq.pop_front();
nuclear@26 142 ++nactive;
nuclear@26 143 --qsize;
nuclear@26 144 lock.unlock();
nuclear@26 145
nuclear@26 146 witem.work();
nuclear@26 147 if(witem.done) {
nuclear@26 148 witem.done();
nuclear@26 149 }
nuclear@26 150
nuclear@26 151 lock.lock();
nuclear@26 152 --nactive;
nuclear@26 153 done_condvar.notify_all();
nuclear@26 154 }
nuclear@26 155 }
nuclear@26 156 }
nuclear@26 157