libresman

view src/threadpool.c @ 15:2b8281a146af

added debugging crap in the threadpool
author John Tsiombikas <nuclear@member.fsf.org>
date Tue, 11 Feb 2014 18:47:00 +0200
parents 84f55eab27cb
children 711698580eb0
line source
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include "threadpool.h"
7 struct work_item {
8 int id; /* just for debugging messages */
9 void *data;
10 struct work_item *next;
11 };
13 struct thread_pool {
14 pthread_t *workers;
15 int num_workers;
17 pthread_mutex_t work_lock;
18 pthread_cond_t work_cond;
20 tpool_work_func work_func;
21 void *cls;
23 struct work_item *work_list, *work_list_tail;
24 int work_count;
25 };
28 static void *thread_func(void *tp);
29 static struct work_item *alloc_node(void);
30 static void free_node(struct work_item *node);
31 static int get_processor_count(void);
35 int tpool_init(struct thread_pool *tpool, int num_threads)
36 {
37 int i;
39 memset(tpool, 0, sizeof *tpool);
42 pthread_mutex_init(&tpool->work_lock, 0);
43 pthread_cond_init(&tpool->work_cond, 0);
46 if(num_threads <= 0) {
47 num_threads = get_processor_count();
48 }
49 tpool->num_workers = num_threads;
51 printf("initializing thread pool with %d worker threads\n", num_threads);
53 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
54 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
55 return -1;
56 }
58 for(i=0; i<num_threads; i++) {
59 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
60 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
61 tpool_destroy(tpool);
62 return -1;
63 }
64 }
65 return 0;
66 }
68 void tpool_destroy(struct thread_pool *tpool)
69 {
70 int i;
71 for(i=0; i<tpool->num_workers; i++) {
72 void *ret;
73 pthread_join(tpool->workers[i], &ret);
74 }
76 pthread_mutex_destroy(&tpool->work_lock);
77 pthread_cond_destroy(&tpool->work_cond);
78 }
80 struct thread_pool *tpool_create(int num_threads)
81 {
82 struct thread_pool *tpool = malloc(sizeof *tpool);
83 if(!tpool) {
84 return 0;
85 }
86 if(tpool_init(tpool, num_threads) == -1) {
87 free(tpool);
88 return 0;
89 }
90 return tpool;
91 }
93 void tpool_free(struct thread_pool *tpool)
94 {
95 if(tpool) {
96 tpool_destroy(tpool);
97 free(tpool);
98 }
99 }
101 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
102 {
103 tpool->work_func = func;
104 tpool->cls = cls;
105 }
107 int tpool_add_work(struct thread_pool *tpool, void *data)
108 {
109 struct work_item *node;
110 static int jcounter;
112 if(!(node = alloc_node())) {
113 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
114 return -1;
115 }
116 node->data = data;
117 node->next = 0;
119 pthread_mutex_lock(&tpool->work_lock);
120 node->id = jcounter++;
122 printf("TPOOL: adding work item: %d\n", node->id);
124 if(!tpool->work_list) {
125 tpool->work_list = tpool->work_list_tail = node;
126 } else {
127 tpool->work_list_tail->next = node;
128 tpool->work_list_tail = node;
129 }
130 pthread_mutex_unlock(&tpool->work_lock);
132 /* wakeup all threads, there's work to do */
133 pthread_cond_broadcast(&tpool->work_cond);
134 return 0;
135 }
138 static void *thread_func(void *tp)
139 {
140 int i, tidx = -1;
141 struct work_item *job;
142 struct thread_pool *tpool = tp;
143 pthread_t tid = pthread_self();
145 for(i=0; i<tpool->num_workers; i++) {
146 if(tpool[i].workers[i] == tid) {
147 tidx = i;
148 break;
149 }
150 }
152 pthread_mutex_lock(&tpool->work_lock);
153 for(;;) {
154 /* while there aren't any work items to do go to sleep on the condvar */
155 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
156 if(!tpool->work_list) {
157 continue; /* spurious wakeup, go back to sleep */
158 }
160 printf("TPOOL: worker %d start job: %d\n", tidx, job->id);
162 job = tpool->work_list;
163 tpool->work_list = tpool->work_list->next;
165 tpool->work_func(job->data, tpool->cls);
167 printf("TPOOL: worker %d completed job: %d\n", tidx, job->id);
168 free_node(job);
169 }
170 pthread_mutex_unlock(&tpool->work_lock);
171 return 0;
172 }
174 /* TODO: custom allocator */
175 static struct work_item *alloc_node(void)
176 {
177 return malloc(sizeof(struct work_item));
178 }
180 static void free_node(struct work_item *node)
181 {
182 free(node);
183 }
185 /* The following highly platform-specific code detects the number
186 * of processors available in the system. It's used by the thread pool
187 * to autodetect how many threads to spawn.
188 * Currently works on: Linux, BSD, Darwin, and Windows.
189 */
191 #if defined(__APPLE__) && defined(__MACH__)
192 # ifndef __unix__
193 # define __unix__ 1
194 # endif /* unix */
195 # ifndef __bsd__
196 # define __bsd__ 1
197 # endif /* bsd */
198 #endif /* apple */
200 #if defined(unix) || defined(__unix__)
201 #include <unistd.h>
203 # ifdef __bsd__
204 # include <sys/sysctl.h>
205 # endif
206 #endif
208 #if defined(WIN32) || defined(__WIN32__)
209 #include <windows.h>
210 #endif
213 static int get_processor_count(void)
214 {
215 #if defined(unix) || defined(__unix__)
216 # if defined(__bsd__)
217 /* BSD systems provide the num.processors through sysctl */
218 int num, mib[] = {CTL_HW, HW_NCPU};
219 size_t len = sizeof num;
221 sysctl(mib, 2, &num, &len, 0, 0);
222 return num;
224 # elif defined(__sgi)
225 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
226 return sysconf(_SC_NPROC_ONLN);
227 # else
228 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
229 return sysconf(_SC_NPROCESSORS_ONLN);
230 # endif /* bsd/sgi/other */
232 #elif defined(WIN32) || defined(__WIN32__)
233 /* under windows we need to call GetSystemInfo */
234 SYSTEM_INFO info;
235 GetSystemInfo(&info);
236 return info.dwNumberOfProcessors;
237 #endif
238 }