nuclear@6: #include nuclear@6: #include nuclear@6: #include nuclear@6: #include nuclear@6: #include "threadpool.h" nuclear@6: nuclear@6: struct work_item { nuclear@15: int id; /* just for debugging messages */ nuclear@6: void *data; nuclear@6: struct work_item *next; nuclear@6: }; nuclear@6: nuclear@6: struct thread_pool { nuclear@6: pthread_t *workers; nuclear@6: int num_workers; nuclear@6: nuclear@6: pthread_mutex_t work_lock; nuclear@6: pthread_cond_t work_cond; nuclear@6: nuclear@6: tpool_work_func work_func; nuclear@6: void *cls; nuclear@6: nuclear@6: struct work_item *work_list, *work_list_tail; nuclear@6: int work_count; nuclear@6: }; nuclear@6: nuclear@6: nuclear@6: static void *thread_func(void *tp); nuclear@6: static struct work_item *alloc_node(void); nuclear@6: static void free_node(struct work_item *node); nuclear@6: static int get_processor_count(void); nuclear@6: nuclear@6: nuclear@6: nuclear@6: int tpool_init(struct thread_pool *tpool, int num_threads) nuclear@6: { nuclear@6: int i; nuclear@6: nuclear@6: memset(tpool, 0, sizeof *tpool); nuclear@6: nuclear@11: nuclear@11: pthread_mutex_init(&tpool->work_lock, 0); nuclear@11: pthread_cond_init(&tpool->work_cond, 0); nuclear@11: nuclear@11: nuclear@6: if(num_threads <= 0) { nuclear@6: num_threads = get_processor_count(); nuclear@6: } nuclear@6: tpool->num_workers = num_threads; nuclear@6: nuclear@6: printf("initializing thread pool with %d worker threads\n", num_threads); nuclear@6: nuclear@11: if(!(tpool->workers = malloc(num_threads * sizeof *tpool->workers))) { nuclear@11: fprintf(stderr, "failed to create array of %d threads\n", num_threads); nuclear@11: return -1; nuclear@11: } nuclear@11: nuclear@6: for(i=0; iworkers + i, 0, thread_func, tpool) == -1) { nuclear@6: fprintf(stderr, "%s: failed to create thread %d\n", __FUNCTION__, i); nuclear@6: tpool_destroy(tpool); nuclear@6: return -1; nuclear@6: } nuclear@6: } nuclear@6: return 0; nuclear@6: } nuclear@6: nuclear@6: void tpool_destroy(struct thread_pool *tpool) nuclear@6: { nuclear@6: int i; nuclear@6: for(i=0; inum_workers; i++) { nuclear@6: void *ret; nuclear@6: pthread_join(tpool->workers[i], &ret); nuclear@6: } nuclear@6: nuclear@6: pthread_mutex_destroy(&tpool->work_lock); nuclear@6: pthread_cond_destroy(&tpool->work_cond); nuclear@6: } nuclear@6: nuclear@10: struct thread_pool *tpool_create(int num_threads) nuclear@10: { nuclear@10: struct thread_pool *tpool = malloc(sizeof *tpool); nuclear@10: if(!tpool) { nuclear@10: return 0; nuclear@10: } nuclear@10: if(tpool_init(tpool, num_threads) == -1) { nuclear@10: free(tpool); nuclear@10: return 0; nuclear@10: } nuclear@10: return tpool; nuclear@10: } nuclear@10: nuclear@10: void tpool_free(struct thread_pool *tpool) nuclear@10: { nuclear@10: if(tpool) { nuclear@10: tpool_destroy(tpool); nuclear@10: free(tpool); nuclear@10: } nuclear@10: } nuclear@10: nuclear@6: void tpool_set_work_func(struct thread_pool *tpool, tpool_work_func func, void *cls) nuclear@6: { nuclear@6: tpool->work_func = func; nuclear@6: tpool->cls = cls; nuclear@6: } nuclear@6: nuclear@6: int tpool_add_work(struct thread_pool *tpool, void *data) nuclear@6: { nuclear@6: struct work_item *node; nuclear@15: static int jcounter; nuclear@6: nuclear@6: if(!(node = alloc_node())) { nuclear@6: fprintf(stderr, "%s: failed to allocate new work item node\n", __FUNCTION__); nuclear@6: return -1; nuclear@6: } nuclear@6: node->data = data; nuclear@6: node->next = 0; nuclear@6: nuclear@6: pthread_mutex_lock(&tpool->work_lock); nuclear@15: node->id = jcounter++; nuclear@15: nuclear@15: printf("TPOOL: adding work item: %d\n", node->id); nuclear@6: nuclear@6: if(!tpool->work_list) { nuclear@6: tpool->work_list = tpool->work_list_tail = node; nuclear@6: } else { nuclear@6: tpool->work_list_tail->next = node; nuclear@6: tpool->work_list_tail = node; nuclear@6: } nuclear@12: pthread_mutex_unlock(&tpool->work_lock); nuclear@6: nuclear@12: /* wakeup all threads, there's work to do */ nuclear@12: pthread_cond_broadcast(&tpool->work_cond); nuclear@6: return 0; nuclear@6: } nuclear@6: nuclear@6: nuclear@6: static void *thread_func(void *tp) nuclear@6: { nuclear@15: int i, tidx = -1; nuclear@6: struct work_item *job; nuclear@6: struct thread_pool *tpool = tp; nuclear@15: pthread_t tid = pthread_self(); nuclear@15: nuclear@15: for(i=0; inum_workers; i++) { nuclear@15: if(tpool[i].workers[i] == tid) { nuclear@15: tidx = i; nuclear@15: break; nuclear@15: } nuclear@15: } nuclear@6: nuclear@6: pthread_mutex_lock(&tpool->work_lock); nuclear@6: for(;;) { nuclear@6: /* while there aren't any work items to do go to sleep on the condvar */ nuclear@6: pthread_cond_wait(&tpool->work_cond, &tpool->work_lock); nuclear@6: if(!tpool->work_list) { nuclear@6: continue; /* spurious wakeup, go back to sleep */ nuclear@6: } nuclear@6: nuclear@15: printf("TPOOL: worker %d start job: %d\n", tidx, job->id); nuclear@15: nuclear@6: job = tpool->work_list; nuclear@6: tpool->work_list = tpool->work_list->next; nuclear@6: nuclear@6: tpool->work_func(job->data, tpool->cls); nuclear@9: nuclear@15: printf("TPOOL: worker %d completed job: %d\n", tidx, job->id); nuclear@9: free_node(job); nuclear@6: } nuclear@6: pthread_mutex_unlock(&tpool->work_lock); nuclear@6: return 0; nuclear@6: } nuclear@6: nuclear@6: /* TODO: custom allocator */ nuclear@6: static struct work_item *alloc_node(void) nuclear@6: { nuclear@6: return malloc(sizeof(struct work_item)); nuclear@6: } nuclear@6: nuclear@6: static void free_node(struct work_item *node) nuclear@6: { nuclear@6: free(node); nuclear@6: } nuclear@6: nuclear@6: /* The following highly platform-specific code detects the number nuclear@6: * of processors available in the system. It's used by the thread pool nuclear@6: * to autodetect how many threads to spawn. nuclear@6: * Currently works on: Linux, BSD, Darwin, and Windows. nuclear@6: */ nuclear@6: nuclear@6: #if defined(__APPLE__) && defined(__MACH__) nuclear@6: # ifndef __unix__ nuclear@6: # define __unix__ 1 nuclear@6: # endif /* unix */ nuclear@6: # ifndef __bsd__ nuclear@6: # define __bsd__ 1 nuclear@6: # endif /* bsd */ nuclear@6: #endif /* apple */ nuclear@6: nuclear@6: #if defined(unix) || defined(__unix__) nuclear@6: #include nuclear@6: nuclear@6: # ifdef __bsd__ nuclear@6: # include nuclear@6: # endif nuclear@6: #endif nuclear@6: nuclear@6: #if defined(WIN32) || defined(__WIN32__) nuclear@6: #include nuclear@6: #endif nuclear@6: nuclear@6: nuclear@6: static int get_processor_count(void) nuclear@6: { nuclear@6: #if defined(unix) || defined(__unix__) nuclear@6: # if defined(__bsd__) nuclear@6: /* BSD systems provide the num.processors through sysctl */ nuclear@6: int num, mib[] = {CTL_HW, HW_NCPU}; nuclear@6: size_t len = sizeof num; nuclear@6: nuclear@6: sysctl(mib, 2, &num, &len, 0, 0); nuclear@6: return num; nuclear@6: nuclear@6: # elif defined(__sgi) nuclear@6: /* SGI IRIX flavour of the _SC_NPROC_ONLN sysconf */ nuclear@6: return sysconf(_SC_NPROC_ONLN); nuclear@6: # else nuclear@6: /* Linux (and others?) have the _SC_NPROCESSORS_ONLN sysconf */ nuclear@6: return sysconf(_SC_NPROCESSORS_ONLN); nuclear@6: # endif /* bsd/sgi/other */ nuclear@6: nuclear@6: #elif defined(WIN32) || defined(__WIN32__) nuclear@6: /* under windows we need to call GetSystemInfo */ nuclear@6: SYSTEM_INFO info; nuclear@6: GetSystemInfo(&info); nuclear@6: return info.dwNumberOfProcessors; nuclear@6: #endif nuclear@6: }