线程池文件有pool.c和pool.h,测试代码在test.c。测试代码比较乱,但应该足够看懂怎么用了。打包下载:pool.tar
pool.h
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#define MAX_THREAD 2000
typedef void *(*job_func)(void *);
typedef struct a_job {
job_func func;
void *arg;
void **result;
struct a_job *next;
} job;
typedef struct {
pthread_t *workers;
int total;
int free;
job *jobs;
job *job_end;
int jobc;
pthread_mutex_t q_mutex;
pthread_cond_t q_cond;
int init;
int destroy;
} thread_pool;
int pool_init(int);
int pool_add_job(job_func, void *, void *);
int pool_destroy();
void *routine(void *);
pool.c
#include "pool.h"
static thread_pool *pool;
int
pool_init(int count)
{
if (count > MAX_THREAD)
return -1;
if (pool != NULL)
return 0;
pool = (thread_pool *)malloc(sizeof (thread_pool));
if (pool == NULL)
return -1;
memset(pool, 0, sizeof (thread_pool));
pool->total = pool->free = count;
pool->init = 1;
pool->destroy = 0;
pool->jobc = 0;
pthread_mutex_init(&pool->q_mutex, NULL);
pthread_cond_init(&pool->q_cond, NULL);
pool->workers = (pthread_t *)malloc(sizeof (pthread_t) * count);
if (pool->workers == NULL)
return -1;
memset(pool->workers, 0, sizeof (pthread_t) * count);
int i;
for (i = 0; i < count; i ++) {
pthread_create(pool->workers + i, NULL, routine, NULL);
}
return 0;
}
int
pool_destroy()
{
if (pool == NULL)
return 0;
if (pool->destroy == 1)
return 0;
pool->destroy = 1;
pool->init = 0;
pthread_cond_broadcast(&pool->q_cond);
int i;
for (i = 0; i < pool->total; i ++) {
pthread_join(*(pool->workers + i), NULL);
}
free(pool->workers);
pthread_mutex_destroy(&pool->q_mutex);
pthread_cond_destroy(&pool->q_cond);
free(pool);
pool = NULL;
return 0;
}
int
pool_add_job(job_func func, void *arg, void *result)
{
job *tmp_job = (job *)malloc(sizeof (job));
if (tmp_job == NULL)
return -1;
tmp_job->func = func;
tmp_job->arg = arg;
tmp_job->result = result;
pthread_mutex_lock(&pool->q_mutex);
if (pool->job_end == NULL) {
pool->jobs = pool->job_end = tmp_job;
} else {
pool->job_end->next = tmp_job;
pool->job_end = tmp_job;
}
pool->jobc ++;
tmp_job = NULL;
printf("A job added, number of jobs : %d\n", pool->jobc);
pthread_mutex_unlock(&pool->q_mutex);
pthread_cond_signal(&pool->q_cond);
return 0;
}
void *
routine(void *arg)
{
pthread_cleanup_push(pthread_mutex_unlock, &pool->q_mutex);
printf("Thread 0x%x start!\n", pthread_self());
while (1) {
pthread_mutex_lock(&pool->q_mutex);
while (pool->jobc == 0 && pool->destroy != 1)
pthread_cond_wait(&pool->q_cond, &pool->q_mutex);
if (pool->destroy == 1) {
pthread_mutex_unlock(&pool->q_mutex);
printf("Thread 0x%x exit\n", pthread_self());
pthread_exit(NULL);
}
job *tmp_job = pool->jobs;
pool->jobs = pool->jobs->next;
pool->jobc --;
pthread_mutex_unlock(&pool->q_mutex);
printf("Job run by thread 0x%x\n", pthread_self());
if (tmp_job->result != NULL)
*(tmp_job->result) = (*tmp_job->func)(tmp_job->arg);
else
(*tmp_job->func)(tmp_job->arg);
free(tmp_job);
}
pthread_exit(NULL);
pthread_cleanup_pop(0);
}
test.c
#include "pool.h"
void *
func(void *arg)
{
int *a = (int *)malloc(sizeof (int));
*a = *(int *)arg;
sleep(1);
return a;
}
int
main(int argc, char **argv)
{
pool_init(3);
printf("inited\n");
int arg = 1;
void *tmp;
pool_add_job(func, &arg, &tmp);
printf("add job finished\n");
// sleep(5);
printf("going to destroy\n");
pool_destroy();
printf("res is %d\n", *(int *)tmp);
free(tmp);
return 0;
}
