rev |
line source |
nuclear@26
|
1 #include <algorithm>
|
nuclear@26
|
2 #include <chrono>
|
nuclear@26
|
3 #include "threadpool.h"
|
nuclear@26
|
4
|
nuclear@26
|
5 using namespace std::chrono;
|
nuclear@26
|
6
|
nuclear@26
|
7 ThreadPool::ThreadPool(int num_threads)
|
nuclear@26
|
8 {
|
nuclear@26
|
9 quit = false;
|
nuclear@26
|
10 qsize = 0;
|
nuclear@26
|
11 nactive = 0;
|
nuclear@26
|
12
|
nuclear@26
|
13 if(num_threads == -1) {
|
nuclear@26
|
14 num_threads = std::thread::hardware_concurrency();
|
nuclear@26
|
15 }
|
nuclear@26
|
16
|
nuclear@26
|
17 printf("creating thread pool with %d threads\n", num_threads);
|
nuclear@26
|
18
|
nuclear@26
|
19 thread = new std::thread[num_threads];
|
nuclear@26
|
20 for(int i=0; i<num_threads; i++) {
|
nuclear@26
|
21 thread[i] = std::thread(&ThreadPool::thread_func, this);
|
nuclear@31
|
22
|
nuclear@31
|
23 #ifdef _MSC_VER
|
nuclear@31
|
24 /* detach the thread to avoid having to join them in the destructor, which
|
nuclear@31
|
25 * causes a deadlock in msvc implementation when called after main returns
|
nuclear@31
|
26 */
|
nuclear@31
|
27 thread[i].detach();
|
nuclear@31
|
28 #endif
|
nuclear@26
|
29 }
|
nuclear@26
|
30 this->num_threads = num_threads;
|
nuclear@26
|
31 }
|
nuclear@26
|
32
|
nuclear@26
|
33 ThreadPool::~ThreadPool()
|
nuclear@26
|
34 {
|
nuclear@31
|
35 #ifdef _MSC_VER
|
nuclear@34
|
36 clear_work();
|
nuclear@31
|
37 #endif
|
nuclear@31
|
38
|
nuclear@26
|
39 quit = true;
|
nuclear@26
|
40 workq_condvar.notify_all();
|
nuclear@26
|
41
|
nuclear@26
|
42 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
|
nuclear@26
|
43 fflush(stdout);
|
nuclear@31
|
44 #ifndef _MSC_VER
|
nuclear@26
|
45 for(int i=0; i<num_threads; i++) {
|
nuclear@26
|
46 thread[i].join();
|
nuclear@26
|
47 putchar('.');
|
nuclear@26
|
48 fflush(stdout);
|
nuclear@26
|
49 }
|
nuclear@31
|
50 #else
|
nuclear@31
|
51 // spin until all threads are done...
|
nuclear@31
|
52 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@31
|
53 while(nactive > 0) {
|
nuclear@31
|
54 lock.unlock();
|
nuclear@31
|
55 std::this_thread::sleep_for(std::chrono::milliseconds(128));
|
nuclear@31
|
56 putchar('.');
|
nuclear@31
|
57 fflush(stdout);
|
nuclear@31
|
58 lock.lock();
|
nuclear@31
|
59 }
|
nuclear@31
|
60 #endif // _MSC_VER
|
nuclear@31
|
61
|
nuclear@26
|
62 putchar('\n');
|
nuclear@31
|
63 delete [] thread;
|
nuclear@26
|
64 }
|
nuclear@26
|
65
|
nuclear@26
|
66 void ThreadPool::add_work(std::function<void ()> func)
|
nuclear@26
|
67 {
|
nuclear@26
|
68 add_work(func, std::function<void ()>{});
|
nuclear@26
|
69 }
|
nuclear@26
|
70
|
nuclear@26
|
71 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
|
nuclear@26
|
72 {
|
nuclear@26
|
73 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
74 workq.push_back(WorkItem{work_func, done_func});
|
nuclear@26
|
75 ++qsize;
|
nuclear@26
|
76 workq_condvar.notify_all();
|
nuclear@26
|
77 }
|
nuclear@26
|
78
|
nuclear@34
|
79 void ThreadPool::clear_work()
|
nuclear@34
|
80 {
|
nuclear@34
|
81 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@34
|
82 workq.clear();
|
nuclear@34
|
83 qsize = 0;
|
nuclear@34
|
84 }
|
nuclear@34
|
85
|
nuclear@26
|
86 int ThreadPool::queued() const
|
nuclear@26
|
87 {
|
nuclear@26
|
88 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
89 return qsize;
|
nuclear@26
|
90 }
|
nuclear@26
|
91
|
nuclear@26
|
92 int ThreadPool::active() const
|
nuclear@26
|
93 {
|
nuclear@26
|
94 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
95 return nactive;
|
nuclear@26
|
96 }
|
nuclear@26
|
97
|
nuclear@26
|
98 int ThreadPool::pending() const
|
nuclear@26
|
99 {
|
nuclear@26
|
100 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
101 return nactive + qsize;
|
nuclear@26
|
102 }
|
nuclear@26
|
103
|
nuclear@26
|
104 long ThreadPool::wait()
|
nuclear@26
|
105 {
|
nuclear@26
|
106 auto start_time = steady_clock::now();
|
nuclear@26
|
107
|
nuclear@26
|
108 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
109 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
|
nuclear@26
|
110
|
nuclear@26
|
111 auto dur = steady_clock::now() - start_time;
|
nuclear@26
|
112 return duration_cast<milliseconds>(dur).count();
|
nuclear@26
|
113 }
|
nuclear@26
|
114
|
nuclear@26
|
115 long ThreadPool::wait(long timeout)
|
nuclear@26
|
116 {
|
nuclear@26
|
117 auto start_time = steady_clock::now();
|
nuclear@26
|
118 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
|
nuclear@26
|
119
|
nuclear@26
|
120 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@26
|
121 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
|
nuclear@26
|
122 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
|
nuclear@26
|
123 break;
|
nuclear@26
|
124 }
|
nuclear@26
|
125 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
|
nuclear@26
|
126 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
|
nuclear@26
|
127 }
|
nuclear@26
|
128
|
nuclear@26
|
129 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
|
nuclear@26
|
130 nactive, qsize, workq.empty() ? "true" : "false");*/
|
nuclear@26
|
131 return dur.count();
|
nuclear@26
|
132 }
|
nuclear@26
|
133
|
nuclear@26
|
134 void ThreadPool::thread_func()
|
nuclear@26
|
135 {
|
nuclear@26
|
136 std::unique_lock<std::mutex> lock(workq_mutex);
|
nuclear@47
|
137 while(!quit) {
|
nuclear@26
|
138 workq_condvar.wait(lock);
|
nuclear@26
|
139
|
nuclear@26
|
140 while(!quit && !workq.empty()) {
|
nuclear@26
|
141 WorkItem witem = workq.front();
|
nuclear@26
|
142 workq.pop_front();
|
nuclear@26
|
143 ++nactive;
|
nuclear@26
|
144 --qsize;
|
nuclear@26
|
145 lock.unlock();
|
nuclear@26
|
146
|
nuclear@26
|
147 witem.work();
|
nuclear@26
|
148 if(witem.done) {
|
nuclear@26
|
149 witem.done();
|
nuclear@26
|
150 }
|
nuclear@26
|
151
|
nuclear@26
|
152 lock.lock();
|
nuclear@26
|
153 --nactive;
|
nuclear@26
|
154 done_condvar.notify_all();
|
nuclear@26
|
155 }
|
nuclear@26
|
156 }
|
nuclear@26
|
157 }
|
nuclear@26
|
158
|