libresman

annotate src/threadpool.c @ 11:bebc065a941f

doesn't work yet
author John Tsiombikas <nuclear@member.fsf.org>
date Fri, 07 Feb 2014 07:50:02 +0200
parents 4d18498a0078
children 84f55eab27cb
rev   line source
nuclear@6 1 #include <stdio.h>
nuclear@6 2 #include <stdlib.h>
nuclear@6 3 #include <string.h>
nuclear@6 4 #include <pthread.h>
nuclear@6 5 #include "threadpool.h"
nuclear@6 6
nuclear@6 7 struct work_item {
nuclear@6 8 void *data;
nuclear@6 9 struct work_item *next;
nuclear@6 10 };
nuclear@6 11
nuclear@6 12 struct thread_pool {
nuclear@6 13 pthread_t *workers;
nuclear@6 14 int num_workers;
nuclear@6 15
nuclear@6 16 pthread_mutex_t work_lock;
nuclear@6 17 pthread_cond_t work_cond;
nuclear@6 18
nuclear@6 19 tpool_work_func work_func;
nuclear@6 20 void *cls;
nuclear@6 21
nuclear@6 22 struct work_item *work_list, *work_list_tail;
nuclear@6 23 int work_count;
nuclear@6 24 };
nuclear@6 25
nuclear@6 26
nuclear@6 27 static void *thread_func(void *tp);
nuclear@6 28 static struct work_item *alloc_node(void);
nuclear@6 29 static void free_node(struct work_item *node);
nuclear@6 30 static int get_processor_count(void);
nuclear@6 31
nuclear@6 32
nuclear@6 33
nuclear@6 34 int tpool_init(struct thread_pool *tpool, int num_threads)
nuclear@6 35 {
nuclear@6 36 int i;
nuclear@6 37
nuclear@6 38 memset(tpool, 0, sizeof *tpool);
nuclear@6 39
nuclear@11 40
nuclear@11 41 pthread_mutex_init(&tpool->work_lock, 0);
nuclear@11 42 pthread_cond_init(&tpool->work_cond, 0);
nuclear@11 43
nuclear@11 44
nuclear@6 45 if(num_threads <= 0) {
nuclear@6 46 num_threads = get_processor_count();
nuclear@6 47 }
nuclear@6 48 tpool->num_workers = num_threads;
nuclear@6 49
nuclear@6 50 printf("initializing thread pool with %d worker threads\n", num_threads);
nuclear@6 51
nuclear@11 52 if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) {
nuclear@11 53 fprintf(stderr, "failed to create array of %d threads\n", num_threads);
nuclear@11 54 return -1;
nuclear@11 55 }
nuclear@11 56
nuclear@6 57 for(i=0; i<num_threads; i++) {
nuclear@6 58 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
nuclear@6 59 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
nuclear@6 60 tpool_destroy(tpool);
nuclear@6 61 return -1;
nuclear@6 62 }
nuclear@6 63 }
nuclear@6 64 return 0;
nuclear@6 65 }
nuclear@6 66
nuclear@6 67 void tpool_destroy(struct thread_pool *tpool)
nuclear@6 68 {
nuclear@6 69 int i;
nuclear@6 70 for(i=0; i<tpool->num_workers; i++) {
nuclear@6 71 void *ret;
nuclear@6 72 pthread_join(tpool->workers[i], &ret);
nuclear@6 73 }
nuclear@6 74
nuclear@6 75 pthread_mutex_destroy(&tpool->work_lock);
nuclear@6 76 pthread_cond_destroy(&tpool->work_cond);
nuclear@6 77 }
nuclear@6 78
nuclear@10 79 struct thread_pool *tpool_create(int num_threads)
nuclear@10 80 {
nuclear@10 81 struct thread_pool *tpool = malloc(sizeof *tpool);
nuclear@10 82 if(!tpool) {
nuclear@10 83 return 0;
nuclear@10 84 }
nuclear@10 85 if(tpool_init(tpool, num_threads) == -1) {
nuclear@10 86 free(tpool);
nuclear@10 87 return 0;
nuclear@10 88 }
nuclear@10 89 return tpool;
nuclear@10 90 }
nuclear@10 91
nuclear@10 92 void tpool_free(struct thread_pool *tpool)
nuclear@10 93 {
nuclear@10 94 if(tpool) {
nuclear@10 95 tpool_destroy(tpool);
nuclear@10 96 free(tpool);
nuclear@10 97 }
nuclear@10 98 }
nuclear@10 99
nuclear@6 100 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
nuclear@6 101 {
nuclear@6 102 tpool->work_func = func;
nuclear@6 103 tpool->cls = cls;
nuclear@6 104 }
nuclear@6 105
nuclear@6 106 int tpool_add_work(struct thread_pool *tpool, void *data)
nuclear@6 107 {
nuclear@6 108 struct work_item *node;
nuclear@6 109
nuclear@6 110 if(!(node = alloc_node())) {
nuclear@6 111 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
nuclear@6 112 return -1;
nuclear@6 113 }
nuclear@6 114 node->data = data;
nuclear@6 115 node->next = 0;
nuclear@6 116
nuclear@6 117 pthread_mutex_lock(&tpool->work_lock);
nuclear@6 118
nuclear@6 119 if(!tpool->work_list) {
nuclear@6 120 tpool->work_list = tpool->work_list_tail = node;
nuclear@6 121 } else {
nuclear@6 122 tpool->work_list_tail->next = node;
nuclear@6 123 tpool->work_list_tail = node;
nuclear@6 124 }
nuclear@6 125
nuclear@6 126 pthread_mutex_unlock(&tpool->work_lock);
nuclear@6 127 return 0;
nuclear@6 128 }
nuclear@6 129
nuclear@6 130
nuclear@6 131 static void *thread_func(void *tp)
nuclear@6 132 {
nuclear@6 133 struct work_item *job;
nuclear@6 134 struct thread_pool *tpool = tp;
nuclear@6 135
nuclear@6 136 pthread_mutex_lock(&tpool->work_lock);
nuclear@6 137 for(;;) {
nuclear@6 138 /* while there aren't any work items to do go to sleep on the condvar */
nuclear@6 139 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
nuclear@6 140 if(!tpool->work_list) {
nuclear@6 141 continue; /* spurious wakeup, go back to sleep */
nuclear@6 142 }
nuclear@6 143
nuclear@6 144 job = tpool->work_list;
nuclear@6 145 tpool->work_list = tpool->work_list->next;
nuclear@6 146
nuclear@6 147 tpool->work_func(job->data, tpool->cls);
nuclear@9 148
nuclear@9 149 free_node(job);
nuclear@6 150 }
nuclear@6 151 pthread_mutex_unlock(&tpool->work_lock);
nuclear@6 152 return 0;
nuclear@6 153 }
nuclear@6 154
nuclear@6 155 /* TODO: custom allocator */
nuclear@6 156 static struct work_item *alloc_node(void)
nuclear@6 157 {
nuclear@6 158 return malloc(sizeof(struct work_item));
nuclear@6 159 }
nuclear@6 160
nuclear@6 161 static void free_node(struct work_item *node)
nuclear@6 162 {
nuclear@6 163 free(node);
nuclear@6 164 }
nuclear@6 165
nuclear@6 166 /* The following highly platform-specific code detects the number
nuclear@6 167 * of processors available in the system. It's used by the thread pool
nuclear@6 168 * to autodetect how many threads to spawn.
nuclear@6 169 * Currently works on: Linux, BSD, Darwin, and Windows.
nuclear@6 170 */
nuclear@6 171
nuclear@6 172 #if defined(__APPLE__) && defined(__MACH__)
nuclear@6 173 # ifndef __unix__
nuclear@6 174 # define __unix__ 1
nuclear@6 175 # endif /* unix */
nuclear@6 176 # ifndef __bsd__
nuclear@6 177 # define __bsd__ 1
nuclear@6 178 # endif /* bsd */
nuclear@6 179 #endif /* apple */
nuclear@6 180
nuclear@6 181 #if defined(unix) || defined(__unix__)
nuclear@6 182 #include <unistd.h>
nuclear@6 183
nuclear@6 184 # ifdef __bsd__
nuclear@6 185 # include <sys/sysctl.h>
nuclear@6 186 # endif
nuclear@6 187 #endif
nuclear@6 188
nuclear@6 189 #if defined(WIN32) || defined(__WIN32__)
nuclear@6 190 #include <windows.h>
nuclear@6 191 #endif
nuclear@6 192
nuclear@6 193
nuclear@6 194 static int get_processor_count(void)
nuclear@6 195 {
nuclear@6 196 #if defined(unix) || defined(__unix__)
nuclear@6 197 # if defined(__bsd__)
nuclear@6 198 /* BSD systems provide the num.processors through sysctl */
nuclear@6 199 int num, mib[] = {CTL_HW, HW_NCPU};
nuclear@6 200 size_t len = sizeof num;
nuclear@6 201
nuclear@6 202 sysctl(mib, 2, &num, &len, 0, 0);
nuclear@6 203 return num;
nuclear@6 204
nuclear@6 205 # elif defined(__sgi)
nuclear@6 206 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
nuclear@6 207 return sysconf(_SC_NPROC_ONLN);
nuclear@6 208 # else
nuclear@6 209 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
nuclear@6 210 return sysconf(_SC_NPROCESSORS_ONLN);
nuclear@6 211 # endif /* bsd/sgi/other */
nuclear@6 212
nuclear@6 213 #elif defined(WIN32) || defined(__WIN32__)
nuclear@6 214 /* under windows we need to call GetSystemInfo */
nuclear@6 215 SYSTEM_INFO info;
nuclear@6 216 GetSystemInfo(&info);
nuclear@6 217 return info.dwNumberOfProcessors;
nuclear@6 218 #endif
nuclear@6 219 }