C++实现线程池的经典模型(2)

#include <stdio.h> 
#include <unistd.h> 
#include <string.h> 
#include <assert.h> 
#include "multi.h" 
worker_queue::worker_queue():head(NULL),tail(NULL),cur_size(0),destroy(false){ 
    pthread_mutex_init(&mutex,NULL); 
    pthread_cond_init(&ready,NULL); 

worker_queue::~worker_queue(){ 
    clean(); 
    destroy = true ; 
    pthread_cond_broadcast(&ready); 
    pthread_mutex_destroy(&mutex); 
    pthread_cond_destroy(&ready); 
}   
int worker_queue::clean(){ 
    worker_node* temp; 
    pthread_mutex_lock(&mutex); 
    while(NULL!=head){ 
        temp = head ; 
        head = head->next ; 
        delete temp ; 
    } 
    tail=head=NULL; 
    cur_size = 0; 
    pthread_mutex_unlock(&mutex); 
    return 0; 

int worker_queue::in_queue(const worker_node *node){ 
    worker_node *temp; 
    temp = new worker_node ; 
    if(NULL == temp) return -1 ; //new err ; 
    *temp = *node ; 
    pthread_mutex_lock(&mutex); 
 
    if(NULL==head){ 
        head=tail=temp; 
    }else{ 
        tail->next=temp ; 
        tail = temp ; 
        tail->next = NULL; 
    } 
    cur_size++; 
    pthread_mutex_unlock(&mutex); 
    pthread_cond_signal(&ready); 
    return 0 ; 

worker_node * worker_queue::out_queue(){ //out of the queue 
    pthread_mutex_lock(&mutex); 
    if((NULL==head) && (false == destroy))  { //queue empty 
        pthread_cond_wait(&ready,&mutex); 
    } 
    //      assert(head!=NULL); 
    //      assert(cur_size!=0); 
    if(destroy){ 
        pthread_mutex_unlock(&mutex); 
        return NULL; 
    } 
    worker_node *temp; 
    temp = head ; 
    head = head->next ; 
    cur_size -- ; 
 
 
    pthread_mutex_unlock(&mutex); 
    return temp; 

pmulti::pmulti(): 
is_init(false),thread_num(0),w_queue(NULL),threads(NULL),is_destroy(false){ 

pmulti::~pmulti(){ 

int pmulti::init(unsigned int thr_num){ 
    if(is_init) 
        return -1;//alread init   
    int i; 
    //pool = new thread_pool; 
    //if(NULL == pool)      return -2;//new error 
    //bzero(pool,sizeof(thread_pool)); 
    w_queue = new worker_queue ; 
    if(NULL == w_queue)    return -2;//new error 
    thread_num = thr_num; 
    threads = new pthread_t[thr_num] ; 
    for(i=0;i<thr_num;i++){ 
        if(0!=pthread_create((threads+i),NULL,thread_fun,(void*)this )) 
            printf("pthread_create error\n"); 
    } 
    is_destroy = false ; 
    is_init = true; 
    return 0; 

int pmulti::add_worker(const worker_node * node){ 
    return  w_queue->in_queue(node); 

int pmulti::destroy(){ 
    int i=0; 
    char* res; 
    is_destroy = true ; 
    if(NULL != w_queue)             
        delete w_queue;         
    for(i=0;i<thread_num;i++){               
        pthread_join(threads[i],(void**)&res);                 
        printf("joined :%lld  end;\n",res);         
    }         
    if(thread_num > 0)   
        delete[] threads;         
    return 0; 

void* pmulti::thread_fun(void*arg){         
    printf("thread num:%lld\n",pthread_self());         
    pmulti *pm =(pmulti*)arg;         
    worker_node * node ;         
    while(1){               
        if(pm->is_destroy)                         
            pthread_exit(NULL);                 
        node = pm->w_queue->out_queue();                 
        if((NULL == node)|| (pm->is_destroy))                         
            pthread_exit(NULL);                 
        node->process(node->arg);                 
        printf("thread_fun:%lld run\n",pthread_self());         
    }         
    return (void*)pthread_self(); 

linux

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/7780b336fff65da1cf6b41fc590fad6f.html