libresman

annotate src/threadpool.c @ 18:711698580eb0

fixed visual studio build directories fixed debug id problem with the thread pool
author John Tsiombikas <nuclear@member.fsf.org>
date Wed, 12 Feb 2014 06:53:30 +0200
parents 2b8281a146af
children fe0dbdfbe403
rev   line source
nuclear@6 1 #include <stdio.h>
nuclear@6 2 #include <stdlib.h>
nuclear@6 3 #include <string.h>
nuclear@6 4 #include <pthread.h>
nuclear@6 5 #include "threadpool.h"
nuclear@6 6
nuclear@6 7 struct work_item {
nuclear@15 8 int id; /* just for debugging messages */
nuclear@6 9 void *data;
nuclear@6 10 struct work_item *next;
nuclear@6 11 };
nuclear@6 12
nuclear@6 13 struct thread_pool {
nuclear@6 14 pthread_t *workers;
nuclear@6 15 int num_workers;
nuclear@6 16
nuclear@6 17 pthread_mutex_t work_lock;
nuclear@6 18 pthread_cond_t work_cond;
nuclear@6 19
nuclear@18 20 int start;
nuclear@18 21 pthread_cond_t start_cond;
nuclear@18 22
nuclear@6 23 tpool_work_func work_func;
nuclear@6 24 void *cls;
nuclear@6 25
nuclear@6 26 struct work_item *work_list, *work_list_tail;
nuclear@6 27 int work_count;
nuclear@6 28 };
nuclear@6 29
nuclear@6 30
nuclear@6 31 static void *thread_func(void *tp);
nuclear@6 32 static struct work_item *alloc_node(void);
nuclear@6 33 static void free_node(struct work_item *node);
nuclear@6 34 static int get_processor_count(void);
nuclear@6 35
nuclear@6 36
nuclear@6 37
nuclear@6 38 int tpool_init(struct thread_pool *tpool, int num_threads)
nuclear@6 39 {
nuclear@6 40 int i;
nuclear@6 41
nuclear@6 42 memset(tpool, 0, sizeof *tpool);
nuclear@6 43
nuclear@11 44
nuclear@11 45 pthread_mutex_init(&tpool->work_lock, 0);
nuclear@11 46 pthread_cond_init(&tpool->work_cond, 0);
nuclear@11 47
nuclear@11 48
nuclear@6 49 if(num_threads <= 0) {
nuclear@6 50 num_threads = get_processor_count();
nuclear@6 51 }
nuclear@6 52 tpool->num_workers = num_threads;
nuclear@6 53
nuclear@6 54 printf("initializing thread pool with %d worker threads\n", num_threads);
nuclear@6 55
nuclear@11 56 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
nuclear@11 57 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
nuclear@11 58 return -1;
nuclear@11 59 }
nuclear@11 60
nuclear@18 61 /* this start condvar is pretty useless */
nuclear@18 62 pthread_cond_init(&tpool->start_cond, 0);
nuclear@18 63
nuclear@6 64 for(i=0; i<num_threads; i++) {
nuclear@6 65 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
nuclear@6 66 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
nuclear@6 67 tpool_destroy(tpool);
nuclear@6 68 return -1;
nuclear@6 69 }
nuclear@6 70 }
nuclear@18 71 tpool->start = 1;
nuclear@18 72 pthread_cond_broadcast(&tpool->start_cond);
nuclear@6 73 return 0;
nuclear@6 74 }
nuclear@6 75
nuclear@6 76 void tpool_destroy(struct thread_pool *tpool)
nuclear@6 77 {
nuclear@6 78 int i;
nuclear@6 79 for(i=0; i<tpool->num_workers; i++) {
nuclear@6 80 void *ret;
nuclear@6 81 pthread_join(tpool->workers[i], &ret);
nuclear@6 82 }
nuclear@6 83
nuclear@6 84 pthread_mutex_destroy(&tpool->work_lock);
nuclear@6 85 pthread_cond_destroy(&tpool->work_cond);
nuclear@6 86 }
nuclear@6 87
nuclear@10 88 struct thread_pool *tpool_create(int num_threads)
nuclear@10 89 {
nuclear@10 90 struct thread_pool *tpool = malloc(sizeof *tpool);
nuclear@10 91 if(!tpool) {
nuclear@10 92 return 0;
nuclear@10 93 }
nuclear@10 94 if(tpool_init(tpool, num_threads) == -1) {
nuclear@10 95 free(tpool);
nuclear@10 96 return 0;
nuclear@10 97 }
nuclear@10 98 return tpool;
nuclear@10 99 }
nuclear@10 100
nuclear@10 101 void tpool_free(struct thread_pool *tpool)
nuclear@10 102 {
nuclear@10 103 if(tpool) {
nuclear@10 104 tpool_destroy(tpool);
nuclear@10 105 free(tpool);
nuclear@10 106 }
nuclear@10 107 }
nuclear@10 108
nuclear@6 109 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
nuclear@6 110 {
nuclear@6 111 tpool->work_func = func;
nuclear@6 112 tpool->cls = cls;
nuclear@6 113 }
nuclear@6 114
nuclear@6 115 int tpool_add_work(struct thread_pool *tpool, void *data)
nuclear@6 116 {
nuclear@6 117 struct work_item *node;
nuclear@15 118 static int jcounter;
nuclear@6 119
nuclear@6 120 if(!(node = alloc_node())) {
nuclear@6 121 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
nuclear@6 122 return -1;
nuclear@6 123 }
nuclear@6 124 node->data = data;
nuclear@6 125 node->next = 0;
nuclear@6 126
nuclear@6 127 pthread_mutex_lock(&tpool->work_lock);
nuclear@15 128 node->id = jcounter++;
nuclear@15 129
nuclear@15 130 printf("TPOOL: adding work item: %d\n", node->id);
nuclear@6 131
nuclear@6 132 if(!tpool->work_list) {
nuclear@6 133 tpool->work_list = tpool->work_list_tail = node;
nuclear@6 134 } else {
nuclear@6 135 tpool->work_list_tail->next = node;
nuclear@6 136 tpool->work_list_tail = node;
nuclear@6 137 }
nuclear@12 138 pthread_mutex_unlock(&tpool->work_lock);
nuclear@6 139
nuclear@12 140 /* wakeup all threads, there's work to do */
nuclear@12 141 pthread_cond_broadcast(&tpool->work_cond);
nuclear@6 142 return 0;
nuclear@6 143 }
nuclear@6 144
nuclear@6 145
nuclear@6 146 static void *thread_func(void *tp)
nuclear@6 147 {
nuclear@15 148 int i, tidx = -1;
nuclear@6 149 struct work_item *job;
nuclear@6 150 struct thread_pool *tpool = tp;
nuclear@15 151 pthread_t tid = pthread_self();
nuclear@15 152
nuclear@18 153 /* wait for the start signal :) */
nuclear@18 154 pthread_mutex_lock(&tpool->work_lock);
nuclear@18 155 while(!tpool->start) {
nuclear@18 156 pthread_cond_wait(&tpool->start_cond, &tpool->work_lock);
nuclear@18 157 }
nuclear@18 158 pthread_mutex_unlock(&tpool->work_lock);
nuclear@18 159
nuclear@15 160 for(i=0; i<tpool->num_workers; i++) {
nuclear@18 161 if(pthread_equal(tpool->workers[i], tid)) {
nuclear@15 162 tidx = i;
nuclear@15 163 break;
nuclear@15 164 }
nuclear@15 165 }
nuclear@6 166
nuclear@6 167 pthread_mutex_lock(&tpool->work_lock);
nuclear@6 168 for(;;) {
nuclear@6 169 /* while there aren't any work items to do go to sleep on the condvar */
nuclear@6 170 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
nuclear@6 171 if(!tpool->work_list) {
nuclear@6 172 continue; /* spurious wakeup, go back to sleep */
nuclear@6 173 }
nuclear@6 174
nuclear@6 175 job = tpool->work_list;
nuclear@6 176 tpool->work_list = tpool->work_list->next;
nuclear@6 177
nuclear@18 178 printf("TPOOL: worker %d start job: %d\n", tidx, job->id);
nuclear@6 179 tpool->work_func(job->data, tpool->cls);
nuclear@15 180 printf("TPOOL: worker %d completed job: %d\n", tidx, job->id);
nuclear@9 181 free_node(job);
nuclear@6 182 }
nuclear@6 183 pthread_mutex_unlock(&tpool->work_lock);
nuclear@6 184 return 0;
nuclear@6 185 }
nuclear@6 186
nuclear@6 187 /* TODO: custom allocator */
nuclear@6 188 static struct work_item *alloc_node(void)
nuclear@6 189 {
nuclear@6 190 return malloc(sizeof(struct work_item));
nuclear@6 191 }
nuclear@6 192
nuclear@6 193 static void free_node(struct work_item *node)
nuclear@6 194 {
nuclear@6 195 free(node);
nuclear@6 196 }
nuclear@6 197
nuclear@6 198 /* The following highly platform-specific code detects the number
nuclear@6 199 * of processors available in the system. It's used by the thread pool
nuclear@6 200 * to autodetect how many threads to spawn.
nuclear@6 201 * Currently works on: Linux, BSD, Darwin, and Windows.
nuclear@6 202 */
nuclear@6 203
nuclear@6 204 #if defined(__APPLE__) && defined(__MACH__)
nuclear@6 205 # ifndef __unix__
nuclear@6 206 # define __unix__ 1
nuclear@6 207 # endif /* unix */
nuclear@6 208 # ifndef __bsd__
nuclear@6 209 # define __bsd__ 1
nuclear@6 210 # endif /* bsd */
nuclear@6 211 #endif /* apple */
nuclear@6 212
nuclear@6 213 #if defined(unix) || defined(__unix__)
nuclear@6 214 #include <unistd.h>
nuclear@6 215
nuclear@6 216 # ifdef __bsd__
nuclear@6 217 # include <sys/sysctl.h>
nuclear@6 218 # endif
nuclear@6 219 #endif
nuclear@6 220
nuclear@6 221 #if defined(WIN32) || defined(__WIN32__)
nuclear@6 222 #include <windows.h>
nuclear@6 223 #endif
nuclear@6 224
nuclear@6 225
nuclear@6 226 static int get_processor_count(void)
nuclear@6 227 {
nuclear@6 228 #if defined(unix) || defined(__unix__)
nuclear@6 229 # if defined(__bsd__)
nuclear@6 230 /* BSD systems provide the num.processors through sysctl */
nuclear@6 231 int num, mib[] = {CTL_HW, HW_NCPU};
nuclear@6 232 size_t len = sizeof num;
nuclear@6 233
nuclear@6 234 sysctl(mib, 2, &num, &len, 0, 0);
nuclear@6 235 return num;
nuclear@6 236
nuclear@6 237 # elif defined(__sgi)
nuclear@6 238 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
nuclear@6 239 return sysconf(_SC_NPROC_ONLN);
nuclear@6 240 # else
nuclear@6 241 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
nuclear@6 242 return sysconf(_SC_NPROCESSORS_ONLN);
nuclear@6 243 # endif /* bsd/sgi/other */
nuclear@6 244
nuclear@6 245 #elif defined(WIN32) || defined(__WIN32__)
nuclear@6 246 /* under windows we need to call GetSystemInfo */
nuclear@6 247 SYSTEM_INFO info;
nuclear@6 248 GetSystemInfo(&info);
nuclear@6 249 return info.dwNumberOfProcessors;
nuclear@6 250 #endif
nuclear@6 251 }