erebus

view liberebus/src/threadpool.cc @ 31:53a98c148bf8

- introduced SurfaceGeometry to carry all the geometric information input to BRDF sampling and evaluation functions. - made Reflectance keep an (optional) pointer to its material - simplified PhongRefl::sample_dir, with the help of SurfaceGeometry - worked around microsoft's broken std::thread implementation's deadlock on join
author John Tsiombikas <nuclear@member.fsf.org>
date Sat, 07 Jun 2014 09:14:17 +0300
parents c8a6fb04fefa
children b1fc96c71bcc
line source
1 #include <algorithm>
2 #include <chrono>
3 #include "threadpool.h"
5 using namespace std::chrono;
7 ThreadPool::ThreadPool(int num_threads)
8 {
9 quit = false;
10 qsize = 0;
11 nactive = 0;
13 if(num_threads == -1) {
14 num_threads = std::thread::hardware_concurrency();
15 }
17 printf("creating thread pool with %d threads\n", num_threads);
19 thread = new std::thread[num_threads];
20 for(int i=0; i<num_threads; i++) {
21 thread[i] = std::thread(&ThreadPool::thread_func, this);
23 #ifdef _MSC_VER
24 /* detach the thread to avoid having to join them in the destructor, which
25 * causes a deadlock in msvc implementation when called after main returns
26 */
27 thread[i].detach();
28 #endif
29 }
30 this->num_threads = num_threads;
31 }
33 ThreadPool::~ThreadPool()
34 {
35 #ifdef _MSC_VER
36 workq_mutex.lock();
37 workq.clear();
38 qsize = 0;
39 workq_mutex.unlock();
40 #endif
42 quit = true;
43 workq_condvar.notify_all();
45 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
46 fflush(stdout);
47 #ifndef _MSC_VER
48 for(int i=0; i<num_threads; i++) {
49 thread[i].join();
50 putchar('.');
51 fflush(stdout);
52 }
54 #else
55 // spin until all threads are done...
56 std::unique_lock<std::mutex> lock(workq_mutex);
57 while(nactive > 0) {
58 lock.unlock();
59 std::this_thread::sleep_for(std::chrono::milliseconds(128));
60 putchar('.');
61 fflush(stdout);
62 lock.lock();
63 }
64 #endif // _MSC_VER
66 putchar('\n');
67 delete [] thread;
68 }
70 void ThreadPool::add_work(std::function<void ()> func)
71 {
72 add_work(func, std::function<void ()>{});
73 }
75 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
76 {
77 std::unique_lock<std::mutex> lock(workq_mutex);
78 workq.push_back(WorkItem{work_func, done_func});
79 ++qsize;
80 workq_condvar.notify_all();
81 }
83 int ThreadPool::queued() const
84 {
85 std::unique_lock<std::mutex> lock(workq_mutex);
86 return qsize;
87 }
89 int ThreadPool::active() const
90 {
91 std::unique_lock<std::mutex> lock(workq_mutex);
92 return nactive;
93 }
95 int ThreadPool::pending() const
96 {
97 std::unique_lock<std::mutex> lock(workq_mutex);
98 return nactive + qsize;
99 }
101 long ThreadPool::wait()
102 {
103 auto start_time = steady_clock::now();
105 std::unique_lock<std::mutex> lock(workq_mutex);
106 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
108 auto dur = steady_clock::now() - start_time;
109 return duration_cast<milliseconds>(dur).count();
110 }
112 long ThreadPool::wait(long timeout)
113 {
114 auto start_time = steady_clock::now();
115 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
117 std::unique_lock<std::mutex> lock(workq_mutex);
118 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
119 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
120 break;
121 }
122 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
123 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
124 }
126 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
127 nactive, qsize, workq.empty() ? "true" : "false");*/
128 return dur.count();
129 }
131 void ThreadPool::thread_func()
132 {
133 std::unique_lock<std::mutex> lock(workq_mutex);
134 for(;;) {
135 if(quit) break;
137 workq_condvar.wait(lock);
139 while(!quit && !workq.empty()) {
140 WorkItem witem = workq.front();
141 workq.pop_front();
142 ++nactive;
143 --qsize;
144 lock.unlock();
146 witem.work();
147 if(witem.done) {
148 witem.done();
149 }
151 lock.lock();
152 --nactive;
153 done_condvar.notify_all();
154 }
155 }
156 }