rev |
line source |
nuclear@6
|
1 #include <stdio.h>
|
nuclear@6
|
2 #include <stdlib.h>
|
nuclear@6
|
3 #include <string.h>
|
nuclear@6
|
4 #include <pthread.h>
|
nuclear@6
|
5 #include "threadpool.h"
|
nuclear@6
|
6
|
nuclear@6
|
7 struct work_item {
|
nuclear@15
|
8 int id; /* just for debugging messages */
|
nuclear@6
|
9 void *data;
|
nuclear@6
|
10 struct work_item *next;
|
nuclear@6
|
11 };
|
nuclear@6
|
12
|
nuclear@6
|
13 struct thread_pool {
|
nuclear@6
|
14 pthread_t *workers;
|
nuclear@6
|
15 int num_workers;
|
nuclear@6
|
16
|
nuclear@6
|
17 pthread_mutex_t work_lock;
|
nuclear@6
|
18 pthread_cond_t work_cond;
|
nuclear@6
|
19
|
nuclear@18
|
20 int start;
|
nuclear@18
|
21 pthread_cond_t start_cond;
|
nuclear@18
|
22
|
nuclear@6
|
23 tpool_work_func work_func;
|
nuclear@6
|
24 void *cls;
|
nuclear@6
|
25
|
nuclear@6
|
26 struct work_item *work_list, *work_list_tail;
|
nuclear@6
|
27 int work_count;
|
nuclear@6
|
28 };
|
nuclear@6
|
29
|
nuclear@6
|
30
|
nuclear@6
|
31 static void *thread_func(void *tp);
|
nuclear@6
|
32 static struct work_item *alloc_node(void);
|
nuclear@6
|
33 static void free_node(struct work_item *node);
|
nuclear@6
|
34 static int get_processor_count(void);
|
nuclear@6
|
35
|
nuclear@6
|
36
|
nuclear@6
|
37
|
nuclear@6
|
38 int tpool_init(struct thread_pool *tpool, int num_threads)
|
nuclear@6
|
39 {
|
nuclear@6
|
40 int i;
|
nuclear@6
|
41
|
nuclear@6
|
42 memset(tpool, 0, sizeof *tpool);
|
nuclear@6
|
43
|
nuclear@11
|
44
|
nuclear@11
|
45 pthread_mutex_init(&tpool->work_lock, 0);
|
nuclear@11
|
46 pthread_cond_init(&tpool->work_cond, 0);
|
nuclear@11
|
47
|
nuclear@11
|
48
|
nuclear@6
|
49 if(num_threads <= 0) {
|
nuclear@6
|
50 num_threads = get_processor_count();
|
nuclear@6
|
51 }
|
nuclear@6
|
52 tpool->num_workers = num_threads;
|
nuclear@6
|
53
|
nuclear@6
|
54 printf("initializing thread pool with %d worker threads\n", num_threads);
|
nuclear@6
|
55
|
nuclear@11
|
56 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
|
nuclear@11
|
57 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
|
nuclear@11
|
58 return -1;
|
nuclear@11
|
59 }
|
nuclear@11
|
60
|
nuclear@18
|
61 /* this start condvar is pretty useless */
|
nuclear@18
|
62 pthread_cond_init(&tpool->start_cond, 0);
|
nuclear@18
|
63
|
nuclear@6
|
64 for(i=0; i<num_threads; i++) {
|
nuclear@6
|
65 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
|
nuclear@6
|
66 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
|
nuclear@6
|
67 tpool_destroy(tpool);
|
nuclear@6
|
68 return -1;
|
nuclear@6
|
69 }
|
nuclear@6
|
70 }
|
nuclear@18
|
71 tpool->start = 1;
|
nuclear@18
|
72 pthread_cond_broadcast(&tpool->start_cond);
|
nuclear@6
|
73 return 0;
|
nuclear@6
|
74 }
|
nuclear@6
|
75
|
nuclear@6
|
76 void tpool_destroy(struct thread_pool *tpool)
|
nuclear@6
|
77 {
|
nuclear@6
|
78 int i;
|
nuclear@6
|
79 for(i=0; i<tpool->num_workers; i++) {
|
nuclear@6
|
80 void *ret;
|
nuclear@6
|
81 pthread_join(tpool->workers[i], &ret);
|
nuclear@6
|
82 }
|
nuclear@6
|
83
|
nuclear@6
|
84 pthread_mutex_destroy(&tpool->work_lock);
|
nuclear@6
|
85 pthread_cond_destroy(&tpool->work_cond);
|
nuclear@6
|
86 }
|
nuclear@6
|
87
|
nuclear@10
|
88 struct thread_pool *tpool_create(int num_threads)
|
nuclear@10
|
89 {
|
nuclear@10
|
90 struct thread_pool *tpool = malloc(sizeof *tpool);
|
nuclear@10
|
91 if(!tpool) {
|
nuclear@10
|
92 return 0;
|
nuclear@10
|
93 }
|
nuclear@10
|
94 if(tpool_init(tpool, num_threads) == -1) {
|
nuclear@10
|
95 free(tpool);
|
nuclear@10
|
96 return 0;
|
nuclear@10
|
97 }
|
nuclear@10
|
98 return tpool;
|
nuclear@10
|
99 }
|
nuclear@10
|
100
|
nuclear@10
|
101 void tpool_free(struct thread_pool *tpool)
|
nuclear@10
|
102 {
|
nuclear@10
|
103 if(tpool) {
|
nuclear@10
|
104 tpool_destroy(tpool);
|
nuclear@10
|
105 free(tpool);
|
nuclear@10
|
106 }
|
nuclear@10
|
107 }
|
nuclear@10
|
108
|
nuclear@6
|
109 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
|
nuclear@6
|
110 {
|
nuclear@6
|
111 tpool->work_func = func;
|
nuclear@6
|
112 tpool->cls = cls;
|
nuclear@6
|
113 }
|
nuclear@6
|
114
|
nuclear@6
|
115 int tpool_add_work(struct thread_pool *tpool, void *data)
|
nuclear@6
|
116 {
|
nuclear@6
|
117 struct work_item *node;
|
nuclear@15
|
118 static int jcounter;
|
nuclear@6
|
119
|
nuclear@6
|
120 if(!(node = alloc_node())) {
|
nuclear@6
|
121 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
|
nuclear@6
|
122 return -1;
|
nuclear@6
|
123 }
|
nuclear@6
|
124 node->data = data;
|
nuclear@6
|
125 node->next = 0;
|
nuclear@6
|
126
|
nuclear@6
|
127 pthread_mutex_lock(&tpool->work_lock);
|
nuclear@15
|
128 node->id = jcounter++;
|
nuclear@15
|
129
|
nuclear@15
|
130 printf("TPOOL: adding work item: %d\n", node->id);
|
nuclear@6
|
131
|
nuclear@6
|
132 if(!tpool->work_list) {
|
nuclear@6
|
133 tpool->work_list = tpool->work_list_tail = node;
|
nuclear@6
|
134 } else {
|
nuclear@6
|
135 tpool->work_list_tail->next = node;
|
nuclear@6
|
136 tpool->work_list_tail = node;
|
nuclear@6
|
137 }
|
nuclear@12
|
138 pthread_mutex_unlock(&tpool->work_lock);
|
nuclear@6
|
139
|
nuclear@12
|
140 /* wakeup all threads, there's work to do */
|
nuclear@12
|
141 pthread_cond_broadcast(&tpool->work_cond);
|
nuclear@6
|
142 return 0;
|
nuclear@6
|
143 }
|
nuclear@6
|
144
|
nuclear@6
|
145
|
nuclear@6
|
146 static void *thread_func(void *tp)
|
nuclear@6
|
147 {
|
nuclear@15
|
148 int i, tidx = -1;
|
nuclear@6
|
149 struct work_item *job;
|
nuclear@6
|
150 struct thread_pool *tpool = tp;
|
nuclear@15
|
151 pthread_t tid = pthread_self();
|
nuclear@15
|
152
|
nuclear@18
|
153 /* wait for the start signal :) */
|
nuclear@18
|
154 pthread_mutex_lock(&tpool->work_lock);
|
nuclear@18
|
155 while(!tpool->start) {
|
nuclear@18
|
156 pthread_cond_wait(&tpool->start_cond, &tpool->work_lock);
|
nuclear@18
|
157 }
|
nuclear@18
|
158 pthread_mutex_unlock(&tpool->work_lock);
|
nuclear@18
|
159
|
nuclear@15
|
160 for(i=0; i<tpool->num_workers; i++) {
|
nuclear@18
|
161 if(pthread_equal(tpool->workers[i], tid)) {
|
nuclear@15
|
162 tidx = i;
|
nuclear@15
|
163 break;
|
nuclear@15
|
164 }
|
nuclear@15
|
165 }
|
nuclear@6
|
166
|
nuclear@6
|
167 for(;;) {
|
nuclear@21
|
168 int job_id;
|
nuclear@21
|
169 void *data;
|
nuclear@21
|
170
|
nuclear@21
|
171 pthread_mutex_lock(&tpool->work_lock);
|
nuclear@6
|
172 /* while there aren't any work items to do go to sleep on the condvar */
|
nuclear@21
|
173 while(!tpool->work_list) {
|
nuclear@21
|
174 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
|
nuclear@6
|
175 }
|
nuclear@6
|
176
|
nuclear@6
|
177 job = tpool->work_list;
|
nuclear@6
|
178 tpool->work_list = tpool->work_list->next;
|
nuclear@6
|
179
|
nuclear@21
|
180 job_id = job->id;
|
nuclear@21
|
181 data = job->data;
|
nuclear@9
|
182 free_node(job);
|
nuclear@21
|
183 pthread_mutex_unlock(&tpool->work_lock);
|
nuclear@21
|
184
|
nuclear@21
|
185 printf("TPOOL: worker %d start job: %d\n", tidx, job_id);
|
nuclear@21
|
186 tpool->work_func(data, tpool->cls);
|
nuclear@21
|
187 printf("TPOOL: worker %d completed job: %d\n", tidx, job_id);
|
nuclear@6
|
188 }
|
nuclear@6
|
189 return 0;
|
nuclear@6
|
190 }
|
nuclear@6
|
191
|
nuclear@6
|
192 /* TODO: custom allocator */
|
nuclear@6
|
193 static struct work_item *alloc_node(void)
|
nuclear@6
|
194 {
|
nuclear@6
|
195 return malloc(sizeof(struct work_item));
|
nuclear@6
|
196 }
|
nuclear@6
|
197
|
nuclear@6
|
198 static void free_node(struct work_item *node)
|
nuclear@6
|
199 {
|
nuclear@6
|
200 free(node);
|
nuclear@6
|
201 }
|
nuclear@6
|
202
|
nuclear@6
|
203 /* The following highly platform-specific code detects the number
|
nuclear@6
|
204 * of processors available in the system. It's used by the thread pool
|
nuclear@6
|
205 * to autodetect how many threads to spawn.
|
nuclear@6
|
206 * Currently works on: Linux, BSD, Darwin, and Windows.
|
nuclear@6
|
207 */
|
nuclear@6
|
208
|
nuclear@6
|
209 #if defined(__APPLE__) && defined(__MACH__)
|
nuclear@6
|
210 # ifndef __unix__
|
nuclear@6
|
211 # define __unix__ 1
|
nuclear@6
|
212 # endif /* unix */
|
nuclear@6
|
213 # ifndef __bsd__
|
nuclear@6
|
214 # define __bsd__ 1
|
nuclear@6
|
215 # endif /* bsd */
|
nuclear@6
|
216 #endif /* apple */
|
nuclear@6
|
217
|
nuclear@6
|
218 #if defined(unix) || defined(__unix__)
|
nuclear@6
|
219 #include <unistd.h>
|
nuclear@6
|
220
|
nuclear@6
|
221 # ifdef __bsd__
|
nuclear@6
|
222 # include <sys/sysctl.h>
|
nuclear@6
|
223 # endif
|
nuclear@6
|
224 #endif
|
nuclear@6
|
225
|
nuclear@6
|
226 #if defined(WIN32) || defined(__WIN32__)
|
nuclear@6
|
227 #include <windows.h>
|
nuclear@6
|
228 #endif
|
nuclear@6
|
229
|
nuclear@6
|
230
|
nuclear@6
|
231 static int get_processor_count(void)
|
nuclear@6
|
232 {
|
nuclear@6
|
233 #if defined(unix) || defined(__unix__)
|
nuclear@6
|
234 # if defined(__bsd__)
|
nuclear@6
|
235 /* BSD systems provide the num.processors through sysctl */
|
nuclear@6
|
236 int num, mib[] = {CTL_HW, HW_NCPU};
|
nuclear@6
|
237 size_t len = sizeof num;
|
nuclear@6
|
238
|
nuclear@6
|
239 sysctl(mib, 2, &num, &len, 0, 0);
|
nuclear@6
|
240 return num;
|
nuclear@6
|
241
|
nuclear@6
|
242 # elif defined(__sgi)
|
nuclear@6
|
243 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
|
nuclear@6
|
244 return sysconf(_SC_NPROC_ONLN);
|
nuclear@6
|
245 # else
|
nuclear@6
|
246 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
|
nuclear@6
|
247 return sysconf(_SC_NPROCESSORS_ONLN);
|
nuclear@6
|
248 # endif /* bsd/sgi/other */
|
nuclear@6
|
249
|
nuclear@6
|
250 #elif defined(WIN32) || defined(__WIN32__)
|
nuclear@6
|
251 /* under windows we need to call GetSystemInfo */
|
nuclear@6
|
252 SYSTEM_INFO info;
|
nuclear@6
|
253 GetSystemInfo(&info);
|
nuclear@6
|
254 return info.dwNumberOfProcessors;
|
nuclear@6
|
255 #endif
|
nuclear@6
|
256 }
|