libresman

view src/threadpool.c @ 12:84f55eab27cb

ok now it sortof works, probably something is failing in the done callback mechanism
author John Tsiombikas <nuclear@member.fsf.org>
date Sat, 08 Feb 2014 04:21:08 +0200
parents bebc065a941f
children 2b8281a146af
line source
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include "threadpool.h"
7 struct work_item {
8 void *data;
9 struct work_item *next;
10 };
12 struct thread_pool {
13 pthread_t *workers;
14 int num_workers;
16 pthread_mutex_t work_lock;
17 pthread_cond_t work_cond;
19 tpool_work_func work_func;
20 void *cls;
22 struct work_item *work_list, *work_list_tail;
23 int work_count;
24 };
27 static void *thread_func(void *tp);
28 static struct work_item *alloc_node(void);
29 static void free_node(struct work_item *node);
30 static int get_processor_count(void);
34 int tpool_init(struct thread_pool *tpool, int num_threads)
35 {
36 int i;
38 memset(tpool, 0, sizeof *tpool);
41 pthread_mutex_init(&tpool->work_lock, 0);
42 pthread_cond_init(&tpool->work_cond, 0);
45 if(num_threads <= 0) {
46 num_threads = get_processor_count();
47 }
48 tpool->num_workers = num_threads;
50 printf("initializing thread pool with %d worker threads\n", num_threads);
52 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
53 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
54 return -1;
55 }
57 for(i=0; i<num_threads; i++) {
58 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
59 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
60 tpool_destroy(tpool);
61 return -1;
62 }
63 }
64 return 0;
65 }
67 void tpool_destroy(struct thread_pool *tpool)
68 {
69 int i;
70 for(i=0; i<tpool->num_workers; i++) {
71 void *ret;
72 pthread_join(tpool->workers[i], &ret);
73 }
75 pthread_mutex_destroy(&tpool->work_lock);
76 pthread_cond_destroy(&tpool->work_cond);
77 }
79 struct thread_pool *tpool_create(int num_threads)
80 {
81 struct thread_pool *tpool = malloc(sizeof *tpool);
82 if(!tpool) {
83 return 0;
84 }
85 if(tpool_init(tpool, num_threads) == -1) {
86 free(tpool);
87 return 0;
88 }
89 return tpool;
90 }
92 void tpool_free(struct thread_pool *tpool)
93 {
94 if(tpool) {
95 tpool_destroy(tpool);
96 free(tpool);
97 }
98 }
100 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
101 {
102 tpool->work_func = func;
103 tpool->cls = cls;
104 }
106 int tpool_add_work(struct thread_pool *tpool, void *data)
107 {
108 struct work_item *node;
110 if(!(node = alloc_node())) {
111 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
112 return -1;
113 }
114 node->data = data;
115 node->next = 0;
117 pthread_mutex_lock(&tpool->work_lock);
119 if(!tpool->work_list) {
120 tpool->work_list = tpool->work_list_tail = node;
121 } else {
122 tpool->work_list_tail->next = node;
123 tpool->work_list_tail = node;
124 }
125 pthread_mutex_unlock(&tpool->work_lock);
127 /* wakeup all threads, there's work to do */
128 pthread_cond_broadcast(&tpool->work_cond);
129 return 0;
130 }
133 static void *thread_func(void *tp)
134 {
135 struct work_item *job;
136 struct thread_pool *tpool = tp;
138 pthread_mutex_lock(&tpool->work_lock);
139 for(;;) {
140 /* while there aren't any work items to do go to sleep on the condvar */
141 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
142 if(!tpool->work_list) {
143 continue; /* spurious wakeup, go back to sleep */
144 }
146 job = tpool->work_list;
147 tpool->work_list = tpool->work_list->next;
149 tpool->work_func(job->data, tpool->cls);
151 free_node(job);
152 }
153 pthread_mutex_unlock(&tpool->work_lock);
154 return 0;
155 }
157 /* TODO: custom allocator */
158 static struct work_item *alloc_node(void)
159 {
160 return malloc(sizeof(struct work_item));
161 }
163 static void free_node(struct work_item *node)
164 {
165 free(node);
166 }
168 /* The following highly platform-specific code detects the number
169 * of processors available in the system. It's used by the thread pool
170 * to autodetect how many threads to spawn.
171 * Currently works on: Linux, BSD, Darwin, and Windows.
172 */
174 #if defined(__APPLE__) && defined(__MACH__)
175 # ifndef __unix__
176 # define __unix__ 1
177 # endif /* unix */
178 # ifndef __bsd__
179 # define __bsd__ 1
180 # endif /* bsd */
181 #endif /* apple */
183 #if defined(unix) || defined(__unix__)
184 #include <unistd.h>
186 # ifdef __bsd__
187 # include <sys/sysctl.h>
188 # endif
189 #endif
191 #if defined(WIN32) || defined(__WIN32__)
192 #include <windows.h>
193 #endif
196 static int get_processor_count(void)
197 {
198 #if defined(unix) || defined(__unix__)
199 # if defined(__bsd__)
200 /* BSD systems provide the num.processors through sysctl */
201 int num, mib[] = {CTL_HW, HW_NCPU};
202 size_t len = sizeof num;
204 sysctl(mib, 2, &num, &len, 0, 0);
205 return num;
207 # elif defined(__sgi)
208 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
209 return sysconf(_SC_NPROC_ONLN);
210 # else
211 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
212 return sysconf(_SC_NPROCESSORS_ONLN);
213 # endif /* bsd/sgi/other */
215 #elif defined(WIN32) || defined(__WIN32__)
216 /* under windows we need to call GetSystemInfo */
217 SYSTEM_INFO info;
218 GetSystemInfo(&info);
219 return info.dwNumberOfProcessors;
220 #endif
221 }