rev |
line source |
nuclear@6
|
1 #include <stdio.h>
|
nuclear@6
|
2 #include <stdlib.h>
|
nuclear@6
|
3 #include <string.h>
|
nuclear@6
|
4 #include <pthread.h>
|
nuclear@6
|
5 #include "threadpool.h"
|
nuclear@6
|
6
|
nuclear@6
|
7 struct work_item {
|
nuclear@6
|
8 void *data;
|
nuclear@6
|
9 struct work_item *next;
|
nuclear@6
|
10 };
|
nuclear@6
|
11
|
nuclear@6
|
12 struct thread_pool {
|
nuclear@6
|
13 pthread_t *workers;
|
nuclear@6
|
14 int num_workers;
|
nuclear@6
|
15
|
nuclear@6
|
16 pthread_mutex_t work_lock;
|
nuclear@6
|
17 pthread_cond_t work_cond;
|
nuclear@6
|
18
|
nuclear@6
|
19 tpool_work_func work_func;
|
nuclear@6
|
20 void *cls;
|
nuclear@6
|
21
|
nuclear@6
|
22 struct work_item *work_list, *work_list_tail;
|
nuclear@6
|
23 int work_count;
|
nuclear@6
|
24 };
|
nuclear@6
|
25
|
nuclear@6
|
26
|
nuclear@6
|
27 static void *thread_func(void *tp);
|
nuclear@6
|
28 static struct work_item *alloc_node(void);
|
nuclear@6
|
29 static void free_node(struct work_item *node);
|
nuclear@6
|
30 static int get_processor_count(void);
|
nuclear@6
|
31
|
nuclear@6
|
32
|
nuclear@6
|
33
|
nuclear@6
|
34 int tpool_init(struct thread_pool *tpool, int num_threads)
|
nuclear@6
|
35 {
|
nuclear@6
|
36 int i;
|
nuclear@6
|
37
|
nuclear@6
|
38 memset(tpool, 0, sizeof *tpool);
|
nuclear@6
|
39
|
nuclear@6
|
40 if(num_threads <= 0) {
|
nuclear@6
|
41 num_threads = get_processor_count();
|
nuclear@6
|
42 }
|
nuclear@6
|
43 tpool->num_workers = num_threads;
|
nuclear@6
|
44
|
nuclear@6
|
45 printf("initializing thread pool with %d worker threads\n", num_threads);
|
nuclear@6
|
46
|
nuclear@6
|
47 for(i=0; i<num_threads; i++) {
|
nuclear@6
|
48 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
|
nuclear@6
|
49 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
|
nuclear@6
|
50 tpool_destroy(tpool);
|
nuclear@6
|
51 return -1;
|
nuclear@6
|
52 }
|
nuclear@6
|
53 }
|
nuclear@6
|
54
|
nuclear@6
|
55 pthread_mutex_init(&tpool->work_lock, 0);
|
nuclear@6
|
56 pthread_cond_init(&tpool->work_cond, 0);
|
nuclear@6
|
57 return 0;
|
nuclear@6
|
58 }
|
nuclear@6
|
59
|
nuclear@6
|
60 void tpool_destroy(struct thread_pool *tpool)
|
nuclear@6
|
61 {
|
nuclear@6
|
62 int i;
|
nuclear@6
|
63 for(i=0; i<tpool->num_workers; i++) {
|
nuclear@6
|
64 void *ret;
|
nuclear@6
|
65 pthread_join(tpool->workers[i], &ret);
|
nuclear@6
|
66 }
|
nuclear@6
|
67
|
nuclear@6
|
68 pthread_mutex_destroy(&tpool->work_lock);
|
nuclear@6
|
69 pthread_cond_destroy(&tpool->work_cond);
|
nuclear@6
|
70 }
|
nuclear@6
|
71
|
nuclear@10
|
72 struct thread_pool *tpool_create(int num_threads)
|
nuclear@10
|
73 {
|
nuclear@10
|
74 struct thread_pool *tpool = malloc(sizeof *tpool);
|
nuclear@10
|
75 if(!tpool) {
|
nuclear@10
|
76 return 0;
|
nuclear@10
|
77 }
|
nuclear@10
|
78 if(tpool_init(tpool, num_threads) == -1) {
|
nuclear@10
|
79 free(tpool);
|
nuclear@10
|
80 return 0;
|
nuclear@10
|
81 }
|
nuclear@10
|
82 return tpool;
|
nuclear@10
|
83 }
|
nuclear@10
|
84
|
nuclear@10
|
85 void tpool_free(struct thread_pool *tpool)
|
nuclear@10
|
86 {
|
nuclear@10
|
87 if(tpool) {
|
nuclear@10
|
88 tpool_destroy(tpool);
|
nuclear@10
|
89 free(tpool);
|
nuclear@10
|
90 }
|
nuclear@10
|
91 }
|
nuclear@10
|
92
|
nuclear@6
|
93 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
|
nuclear@6
|
94 {
|
nuclear@6
|
95 tpool->work_func = func;
|
nuclear@6
|
96 tpool->cls = cls;
|
nuclear@6
|
97 }
|
nuclear@6
|
98
|
nuclear@6
|
99 int tpool_add_work(struct thread_pool *tpool, void *data)
|
nuclear@6
|
100 {
|
nuclear@6
|
101 struct work_item *node;
|
nuclear@6
|
102
|
nuclear@6
|
103 if(!(node = alloc_node())) {
|
nuclear@6
|
104 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
|
nuclear@6
|
105 return -1;
|
nuclear@6
|
106 }
|
nuclear@6
|
107 node->data = data;
|
nuclear@6
|
108 node->next = 0;
|
nuclear@6
|
109
|
nuclear@6
|
110 pthread_mutex_lock(&tpool->work_lock);
|
nuclear@6
|
111
|
nuclear@6
|
112 if(!tpool->work_list) {
|
nuclear@6
|
113 tpool->work_list = tpool->work_list_tail = node;
|
nuclear@6
|
114 } else {
|
nuclear@6
|
115 tpool->work_list_tail->next = node;
|
nuclear@6
|
116 tpool->work_list_tail = node;
|
nuclear@6
|
117 }
|
nuclear@6
|
118
|
nuclear@6
|
119 pthread_mutex_unlock(&tpool->work_lock);
|
nuclear@6
|
120 return 0;
|
nuclear@6
|
121 }
|
nuclear@6
|
122
|
nuclear@6
|
123
|
nuclear@6
|
124 static void *thread_func(void *tp)
|
nuclear@6
|
125 {
|
nuclear@6
|
126 struct work_item *job;
|
nuclear@6
|
127 struct thread_pool *tpool = tp;
|
nuclear@6
|
128
|
nuclear@6
|
129 pthread_mutex_lock(&tpool->work_lock);
|
nuclear@6
|
130 for(;;) {
|
nuclear@6
|
131 /* while there aren't any work items to do go to sleep on the condvar */
|
nuclear@6
|
132 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
|
nuclear@6
|
133 if(!tpool->work_list) {
|
nuclear@6
|
134 continue; /* spurious wakeup, go back to sleep */
|
nuclear@6
|
135 }
|
nuclear@6
|
136
|
nuclear@6
|
137 job = tpool->work_list;
|
nuclear@6
|
138 tpool->work_list = tpool->work_list->next;
|
nuclear@6
|
139
|
nuclear@6
|
140 tpool->work_func(job->data, tpool->cls);
|
nuclear@9
|
141
|
nuclear@9
|
142 free_node(job);
|
nuclear@6
|
143 }
|
nuclear@6
|
144 pthread_mutex_unlock(&tpool->work_lock);
|
nuclear@6
|
145 return 0;
|
nuclear@6
|
146 }
|
nuclear@6
|
147
|
nuclear@6
|
148 /* TODO: custom allocator */
|
nuclear@6
|
149 static struct work_item *alloc_node(void)
|
nuclear@6
|
150 {
|
nuclear@6
|
151 return malloc(sizeof(struct work_item));
|
nuclear@6
|
152 }
|
nuclear@6
|
153
|
nuclear@6
|
154 static void free_node(struct work_item *node)
|
nuclear@6
|
155 {
|
nuclear@6
|
156 free(node);
|
nuclear@6
|
157 }
|
nuclear@6
|
158
|
nuclear@6
|
159 /* The following highly platform-specific code detects the number
|
nuclear@6
|
160 * of processors available in the system. It's used by the thread pool
|
nuclear@6
|
161 * to autodetect how many threads to spawn.
|
nuclear@6
|
162 * Currently works on: Linux, BSD, Darwin, and Windows.
|
nuclear@6
|
163 */
|
nuclear@6
|
164
|
nuclear@6
|
165 #if defined(__APPLE__) && defined(__MACH__)
|
nuclear@6
|
166 # ifndef __unix__
|
nuclear@6
|
167 # define __unix__ 1
|
nuclear@6
|
168 # endif /* unix */
|
nuclear@6
|
169 # ifndef __bsd__
|
nuclear@6
|
170 # define __bsd__ 1
|
nuclear@6
|
171 # endif /* bsd */
|
nuclear@6
|
172 #endif /* apple */
|
nuclear@6
|
173
|
nuclear@6
|
174 #if defined(unix) || defined(__unix__)
|
nuclear@6
|
175 #include <unistd.h>
|
nuclear@6
|
176
|
nuclear@6
|
177 # ifdef __bsd__
|
nuclear@6
|
178 # include <sys/sysctl.h>
|
nuclear@6
|
179 # endif
|
nuclear@6
|
180 #endif
|
nuclear@6
|
181
|
nuclear@6
|
182 #if defined(WIN32) || defined(__WIN32__)
|
nuclear@6
|
183 #include <windows.h>
|
nuclear@6
|
184 #endif
|
nuclear@6
|
185
|
nuclear@6
|
186
|
nuclear@6
|
187 static int get_processor_count(void)
|
nuclear@6
|
188 {
|
nuclear@6
|
189 #if defined(unix) || defined(__unix__)
|
nuclear@6
|
190 # if defined(__bsd__)
|
nuclear@6
|
191 /* BSD systems provide the num.processors through sysctl */
|
nuclear@6
|
192 int num, mib[] = {CTL_HW, HW_NCPU};
|
nuclear@6
|
193 size_t len = sizeof num;
|
nuclear@6
|
194
|
nuclear@6
|
195 sysctl(mib, 2, &num, &len, 0, 0);
|
nuclear@6
|
196 return num;
|
nuclear@6
|
197
|
nuclear@6
|
198 # elif defined(__sgi)
|
nuclear@6
|
199 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
|
nuclear@6
|
200 return sysconf(_SC_NPROC_ONLN);
|
nuclear@6
|
201 # else
|
nuclear@6
|
202 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
|
nuclear@6
|
203 return sysconf(_SC_NPROCESSORS_ONLN);
|
nuclear@6
|
204 # endif /* bsd/sgi/other */
|
nuclear@6
|
205
|
nuclear@6
|
206 #elif defined(WIN32) || defined(__WIN32__)
|
nuclear@6
|
207 /* under windows we need to call GetSystemInfo */
|
nuclear@6
|
208 SYSTEM_INFO info;
|
nuclear@6
|
209 GetSystemInfo(&info);
|
nuclear@6
|
210 return info.dwNumberOfProcessors;
|
nuclear@6
|
211 #endif
|
nuclear@6
|
212 }
|