libresman

view src/threadpool.c @ 8:86465d3d05d2

some leftover unsaved changes
author John Tsiombikas <nuclear@member.fsf.org>
date Mon, 03 Feb 2014 05:24:22 +0200
parents
children 03f3de659c32
line source
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <pthread.h>
5 #include "threadpool.h"
7 struct work_item {
8 void *data;
9 struct work_item *next;
10 };
12 struct thread_pool {
13 pthread_t *workers;
14 int num_workers;
16 pthread_mutex_t work_lock;
17 pthread_cond_t work_cond;
19 tpool_work_func work_func;
20 void *cls;
22 struct work_item *work_list, *work_list_tail;
23 int work_count;
24 };
27 static void *thread_func(void *tp);
28 static struct work_item *alloc_node(void);
29 static void free_node(struct work_item *node);
30 static int get_processor_count(void);
34 int tpool_init(struct thread_pool *tpool, int num_threads)
35 {
36 int i;
38 memset(tpool, 0, sizeof *tpool);
40 if(num_threads <= 0) {
41 num_threads = get_processor_count();
42 }
43 tpool->num_workers = num_threads;
45 printf("initializing thread pool with %d worker threads\n", num_threads);
47 for(i=0; i<num_threads; i++) {
48 if(pthread_create(tpool->workers + i, 0, thread_func, tpool) == -1) {
49 fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i);
50 tpool_destroy(tpool);
51 return -1;
52 }
53 }
55 pthread_mutex_init(&tpool->work_lock, 0);
56 pthread_cond_init(&tpool->work_cond, 0);
57 return 0;
58 }
60 void tpool_destroy(struct thread_pool *tpool)
61 {
62 int i;
63 for(i=0; i<tpool->num_workers; i++) {
64 void *ret;
65 pthread_join(tpool->workers[i], &ret);
66 }
68 pthread_mutex_destroy(&tpool->work_lock);
69 pthread_cond_destroy(&tpool->work_cond);
70 }
72 void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls)
73 {
74 tpool->work_func = func;
75 tpool->cls = cls;
76 }
78 int tpool_add_work(struct thread_pool *tpool, void *data)
79 {
80 struct work_item *node;
82 if(!(node = alloc_node())) {
83 fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__);
84 return -1;
85 }
86 node->data = data;
87 node->next = 0;
89 pthread_mutex_lock(&tpool->work_lock);
91 if(!tpool->work_list) {
92 tpool->work_list = tpool->work_list_tail = node;
93 } else {
94 tpool->work_list_tail->next = node;
95 tpool->work_list_tail = node;
96 }
98 pthread_mutex_unlock(&tpool->work_lock);
99 return 0;
100 }
103 static void *thread_func(void *tp)
104 {
105 struct work_item *job;
106 struct thread_pool *tpool = tp;
108 pthread_mutex_lock(&tpool->work_lock);
109 for(;;) {
110 /* while there aren't any work items to do go to sleep on the condvar */
111 pthread_cond_wait(&tpool->work_cond, &tpool->work_lock);
112 if(!tpool->work_list) {
113 continue; /* spurious wakeup, go back to sleep */
114 }
116 job = tpool->work_list;
117 tpool->work_list = tpool->work_list->next;
119 tpool->work_func(job->data, tpool->cls);
120 }
121 pthread_mutex_unlock(&tpool->work_lock);
122 return 0;
123 }
125 /* TODO: custom allocator */
126 static struct work_item *alloc_node(void)
127 {
128 return malloc(sizeof(struct work_item));
129 }
131 static void free_node(struct work_item *node)
132 {
133 free(node);
134 }
136 /* The following highly platform-specific code detects the number
137 * of processors available in the system. It's used by the thread pool
138 * to autodetect how many threads to spawn.
139 * Currently works on: Linux, BSD, Darwin, and Windows.
140 */
142 #if defined(__APPLE__) && defined(__MACH__)
143 # ifndef __unix__
144 # define __unix__ 1
145 # endif /* unix */
146 # ifndef __bsd__
147 # define __bsd__ 1
148 # endif /* bsd */
149 #endif /* apple */
151 #if defined(unix) || defined(__unix__)
152 #include <unistd.h>
154 # ifdef __bsd__
155 # include <sys/sysctl.h>
156 # endif
157 #endif
159 #if defined(WIN32) || defined(__WIN32__)
160 #include <windows.h>
161 #endif
164 static int get_processor_count(void)
165 {
166 #if defined(unix) || defined(__unix__)
167 # if defined(__bsd__)
168 /* BSD systems provide the num.processors through sysctl */
169 int num, mib[] = {CTL_HW, HW_NCPU};
170 size_t len = sizeof num;
172 sysctl(mib, 2, &num, &len, 0, 0);
173 return num;
175 # elif defined(__sgi)
176 /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */
177 return sysconf(_SC_NPROC_ONLN);
178 # else
179 /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */
180 return sysconf(_SC_NPROCESSORS_ONLN);
181 # endif /* bsd/sgi/other */
183 #elif defined(WIN32) || defined(__WIN32__)
184 /* under windows we need to call GetSystemInfo */
185 SYSTEM_INFO info;
186 GetSystemInfo(&info);
187 return info.dwNumberOfProcessors;
188 #endif
189 }