libresman

view src/threadpool.c @ 18:711698580eb0

fixed visual studio build directories fixed debug id problem with the thread pool
author John Tsiombikas <nuclear@member.fsf.org>
date Wed, 12 Feb 2014 06:53:30 +0200
parents 2b8281a146af
children fe0dbdfbe403
line source
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include "threadpool.h"
7 struct work_item {
8 int id; /* just for debugging messages */
9 void *data;
10 struct work_item *next;
11 };
13 struct thread_pool {
14 pthread_t *workers;
15 int num_workers;
17 pthread_mutex_t work_lock;
18 pthread_cond_t work_cond;
20 int start;
21 pthread_cond_t start_cond;
23 tpool_work_func work_func;
24 void *cls;
26 struct work_item *work_list, *work_list_tail;
27 int work_count;
28 };
31 static void *thread_func(void *tp);
32 static struct work_item *alloc_node(void);
33 static void free_node(struct work_item *node);
34 static int get_processor_count(void);
38 int tpool_init(struct thread_pool *tpool, int num_threads)
39 {
40 int i;
42 memset(tpool, 0, sizeof *tpool);
45 pthread_mutex_init(&tpool->work_lock, 0);
46 pthread_cond_init(&tpool->work_cond, 0);
49 if(num_threads <= 0) {
50 num_threads = get_processor_count();
51 }
52 tpool->num_workers = num_threads;
54 printf("initializing thread pool with %d worker threads\n", num_threads);
56 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
57 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
58 return -1;
59 }
61 /* this start condvar is pretty useless */
62 pthread_cond_init(&tpool->start_cond, 0);
64 for(i=0; i<num_threads; i++) {
65 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
66 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
67 tpool_destroy(tpool);
68 return -1;
69 }
70 }
71 tpool->start = 1;
72 pthread_cond_broadcast(&tpool->start_cond);
73 return 0;
74 }
76 void tpool_destroy(struct thread_pool *tpool)
77 {
78 int i;
79 for(i=0; i<tpool->num_workers; i++) {
80 void *ret;
81 pthread_join(tpool->workers[i], &ret);
82 }
84 pthread_mutex_destroy(&tpool->work_lock);
85 pthread_cond_destroy(&tpool->work_cond);
86 }
88 struct thread_pool *tpool_create(int num_threads)
89 {
90 struct thread_pool *tpool = malloc(sizeof *tpool);
91 if(!tpool) {
92 return 0;
93 }
94 if(tpool_init(tpool, num_threads) == -1) {
95 free(tpool);
96 return 0;
97 }
98 return tpool;
99 }
101 void tpool_free(struct thread_pool *tpool)
102 {
103 if(tpool) {
104 tpool_destroy(tpool);
105 free(tpool);
106 }
107 }
109 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
110 {
111 tpool->work_func = func;
112 tpool->cls = cls;
113 }
115 int tpool_add_work(struct thread_pool *tpool, void *data)
116 {
117 struct work_item *node;
118 static int jcounter;
120 if(!(node = alloc_node())) {
121 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
122 return -1;
123 }
124 node->data = data;
125 node->next = 0;
127 pthread_mutex_lock(&tpool->work_lock);
128 node->id = jcounter++;
130 printf("TPOOL: adding work item: %d\n", node->id);
132 if(!tpool->work_list) {
133 tpool->work_list = tpool->work_list_tail = node;
134 } else {
135 tpool->work_list_tail->next = node;
136 tpool->work_list_tail = node;
137 }
138 pthread_mutex_unlock(&tpool->work_lock);
140 /* wakeup all threads, there's work to do */
141 pthread_cond_broadcast(&tpool->work_cond);
142 return 0;
143 }
146 static void *thread_func(void *tp)
147 {
148 int i, tidx = -1;
149 struct work_item *job;
150 struct thread_pool *tpool = tp;
151 pthread_t tid = pthread_self();
153 /* wait for the start signal :) */
154 pthread_mutex_lock(&tpool->work_lock);
155 while(!tpool->start) {
156 pthread_cond_wait(&tpool->start_cond, &tpool->work_lock);
157 }
158 pthread_mutex_unlock(&tpool->work_lock);
160 for(i=0; i<tpool->num_workers; i++) {
161 if(pthread_equal(tpool->workers[i], tid)) {
162 tidx = i;
163 break;
164 }
165 }
167 pthread_mutex_lock(&tpool->work_lock);
168 for(;;) {
169 /* while there aren't any work items to do go to sleep on the condvar */
170 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
171 if(!tpool->work_list) {
172 continue; /* spurious wakeup, go back to sleep */
173 }
175 job = tpool->work_list;
176 tpool->work_list = tpool->work_list->next;
178 printf("TPOOL: worker %d start job: %d\n", tidx, job->id);
179 tpool->work_func(job->data, tpool->cls);
180 printf("TPOOL: worker %d completed job: %d\n", tidx, job->id);
181 free_node(job);
182 }
183 pthread_mutex_unlock(&tpool->work_lock);
184 return 0;
185 }
187 /* TODO: custom allocator */
188 static struct work_item *alloc_node(void)
189 {
190 return malloc(sizeof(struct work_item));
191 }
193 static void free_node(struct work_item *node)
194 {
195 free(node);
196 }
198 /* The following highly platform-specific code detects the number
199 * of processors available in the system. It's used by the thread pool
200 * to autodetect how many threads to spawn.
201 * Currently works on: Linux, BSD, Darwin, and Windows.
202 */
204 #if defined(__APPLE__) && defined(__MACH__)
205 # ifndef __unix__
206 # define __unix__ 1
207 # endif /* unix */
208 # ifndef __bsd__
209 # define __bsd__ 1
210 # endif /* bsd */
211 #endif /* apple */
213 #if defined(unix) || defined(__unix__)
214 #include <unistd.h>
216 # ifdef __bsd__
217 # include <sys/sysctl.h>
218 # endif
219 #endif
221 #if defined(WIN32) || defined(__WIN32__)
222 #include <windows.h>
223 #endif
226 static int get_processor_count(void)
227 {
228 #if defined(unix) || defined(__unix__)
229 # if defined(__bsd__)
230 /* BSD systems provide the num.processors through sysctl */
231 int num, mib[] = {CTL_HW, HW_NCPU};
232 size_t len = sizeof num;
234 sysctl(mib, 2, &num, &len, 0, 0);
235 return num;
237 # elif defined(__sgi)
238 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
239 return sysconf(_SC_NPROC_ONLN);
240 # else
241 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
242 return sysconf(_SC_NPROCESSORS_ONLN);
243 # endif /* bsd/sgi/other */
245 #elif defined(WIN32) || defined(__WIN32__)
246 /* under windows we need to call GetSystemInfo */
247 SYSTEM_INFO info;
248 GetSystemInfo(&info);
249 return info.dwNumberOfProcessors;
250 #endif
251 }