erebus

view liberebus/src/threadpool.cc @ 27:0ced900e15a7

added missing <algorithms> header in erebus.cc
author John Tsiombikas <nuclear@member.fsf.org>
date Mon, 02 Jun 2014 00:53:01 +0300
parents 6ef4b10fa468
children 53a98c148bf8
line source
1 #include <algorithm>
2 #include <chrono>
3 #include "threadpool.h"
5 using namespace std::chrono;
7 ThreadPool::ThreadPool(int num_threads)
8 {
9 quit = false;
10 qsize = 0;
11 nactive = 0;
13 if(num_threads == -1) {
14 num_threads = std::thread::hardware_concurrency();
15 }
17 printf("creating thread pool with %d threads\n", num_threads);
19 thread = new std::thread[num_threads];
20 for(int i=0; i<num_threads; i++) {
21 thread[i] = std::thread(&ThreadPool::thread_func, this);
22 }
23 this->num_threads = num_threads;
24 }
26 ThreadPool::~ThreadPool()
27 {
28 quit = true;
29 workq_condvar.notify_all();
31 printf("ThreadPool: waiting for %d worker threads to stop ", num_threads);
32 fflush(stdout);
33 for(int i=0; i<num_threads; i++) {
34 thread[i].join();
35 putchar('.');
36 fflush(stdout);
37 }
38 putchar('\n');
39 }
41 void ThreadPool::add_work(std::function<void ()> func)
42 {
43 add_work(func, std::function<void ()>{});
44 }
46 void ThreadPool::add_work(std::function<void ()> work_func, std::function<void ()> done_func)
47 {
48 std::unique_lock<std::mutex> lock(workq_mutex);
49 workq.push_back(WorkItem{work_func, done_func});
50 ++qsize;
51 workq_condvar.notify_all();
52 }
54 int ThreadPool::queued() const
55 {
56 std::unique_lock<std::mutex> lock(workq_mutex);
57 return qsize;
58 }
60 int ThreadPool::active() const
61 {
62 std::unique_lock<std::mutex> lock(workq_mutex);
63 return nactive;
64 }
66 int ThreadPool::pending() const
67 {
68 std::unique_lock<std::mutex> lock(workq_mutex);
69 return nactive + qsize;
70 }
72 long ThreadPool::wait()
73 {
74 auto start_time = steady_clock::now();
76 std::unique_lock<std::mutex> lock(workq_mutex);
77 done_condvar.wait(lock, [this](){ return nactive == 0 && workq.empty(); });
79 auto dur = steady_clock::now() - start_time;
80 return duration_cast<milliseconds>(dur).count();
81 }
83 long ThreadPool::wait(long timeout)
84 {
85 auto start_time = steady_clock::now();
86 duration<long, std::milli> dur, timeout_dur(std::max(timeout, 5L));
88 std::unique_lock<std::mutex> lock(workq_mutex);
89 while(timeout_dur.count() > 0 && (nactive > 0 || !workq.empty())) {
90 if(done_condvar.wait_for(lock, timeout_dur) == std::cv_status::timeout) {
91 break;
92 }
93 dur = duration_cast<milliseconds>(steady_clock::now() - start_time);
94 timeout_dur = milliseconds(std::max(timeout, 5L)) - dur;
95 }
97 /*printf("waited for: %ld ms (%ld req) (na %d,qs %d,em %s)\n", dur.count(), timeout,
98 nactive, qsize, workq.empty() ? "true" : "false");*/
99 return dur.count();
100 }
102 void ThreadPool::thread_func()
103 {
104 std::unique_lock<std::mutex> lock(workq_mutex);
105 for(;;) {
106 if(quit) break;
108 workq_condvar.wait(lock);
110 while(!quit && !workq.empty()) {
111 WorkItem witem = workq.front();
112 workq.pop_front();
113 ++nactive;
114 --qsize;
115 lock.unlock();
117 witem.work();
118 if(witem.done) {
119 witem.done();
120 }
122 lock.lock();
123 --nactive;
124 done_condvar.notify_all();
125 }
126 }
127 }