erebus
view liberebus/src/threadpool.cc @ 47:4b91c9a501d8
minor fix of strange code structure in the thread func
author | John Tsiombikas <nuclear@member.fsf.org> |
---|---|
date | Mon, 01 Feb 2016 14:56:15 +0200 |
parents | d15ee526daa6 |
children |
line source
1 #include <algorithm>
2 #include <chrono>
3 #include "threadpool.h"
5 using namespace std::chrono;
7 ThreadPool::ThreadPool(int num_threads)
8 {
9 quit = false;
10 qsize = 0;
11 nactive = 0;
13 if(num_threads == -1) {
14 num_threads = std::thread::hardware_concurrency();
15 }
17 printf("creating thread pool with %d threads\n", num_threads);
19 thread = new std::thread[num_threads];
20 for(int i=0; i<num_threads; i++) {
21 thread[i] = std::thread(&ThreadPool::thread_func, this);
23 #ifdef _MSC_VER
24 /* detach the thread to avoid having to join them in the destructor, which
25 * causes a deadlock in msvc implementation when called after main returns
26 */
27 thread[i].detach();
28 #endif
29 }
30 this->num_threads = num_threads;
31 }
33 ThreadPool::~ThreadPool()
34 {
35 #ifdef _MSC_VER
36 clear_work();
37 #endif
39 quit = true;
40 workq_condvar.notify_all();
42 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
43 fflush(stdout);
44 #ifndef _MSC_VER
45 for(int i=0; i<num_threads; i++) {
46 thread[i].join();
47 putchar('.');
48 fflush(stdout);
49 }
50 #else
51 // spin until all threads are done...
52 std::unique_lock<std::mutex> lock(workq_mutex);
53 while(nactive > 0) {
54 lock.unlock();
55 std::this_thread::sleep_for(std::chrono::milliseconds(128));
56 putchar('.');
57 fflush(stdout);
58 lock.lock();
59 }
60 #endif // _MSC_VER
62 putchar('\n');
63 delete [] thread;
64 }
66 void ThreadPool::add_work(std::function<void ()> func)
67 {
68 add_work(func, std::function<void ()>{});
69 }
71 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
72 {
73 std::unique_lock<std::mutex> lock(workq_mutex);
74 workq.push_back(WorkItem{work_func, done_func});
75 ++qsize;
76 workq_condvar.notify_all();
77 }
79 void ThreadPool::clear_work()
80 {
81 std::unique_lock<std::mutex> lock(workq_mutex);
82 workq.clear();
83 qsize = 0;
84 }
86 int ThreadPool::queued() const
87 {
88 std::unique_lock<std::mutex> lock(workq_mutex);
89 return qsize;
90 }
92 int ThreadPool::active() const
93 {
94 std::unique_lock<std::mutex> lock(workq_mutex);
95 return nactive;
96 }
98 int ThreadPool::pending() const
99 {
100 std::unique_lock<std::mutex> lock(workq_mutex);
101 return nactive + qsize;
102 }
104 long ThreadPool::wait()
105 {
106 auto start_time = steady_clock::now();
108 std::unique_lock<std::mutex> lock(workq_mutex);
109 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
111 auto dur = steady_clock::now() - start_time;
112 return duration_cast<milliseconds>(dur).count();
113 }
115 long ThreadPool::wait(long timeout)
116 {
117 auto start_time = steady_clock::now();
118 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
120 std::unique_lock<std::mutex> lock(workq_mutex);
121 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
122 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
123 break;
124 }
125 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
126 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
127 }
129 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
130 nactive, qsize, workq.empty() ? "true" : "false");*/
131 return dur.count();
132 }
134 void ThreadPool::thread_func()
135 {
136 std::unique_lock<std::mutex> lock(workq_mutex);
137 while(!quit) {
138 workq_condvar.wait(lock);
140 while(!quit && !workq.empty()) {
141 WorkItem witem = workq.front();
142 workq.pop_front();
143 ++nactive;
144 --qsize;
145 lock.unlock();
147 witem.work();
148 if(witem.done) {
149 witem.done();
150 }
152 lock.lock();
153 --nactive;
154 done_condvar.notify_all();
155 }
156 }
157 }