有限容量BlockingQueue:消费者生产者

2014-11-24 01:24:16 · 作者: · 浏览: 2
有限容量的BlockingQueue实现工作队列,用于生产者消费者问题。
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
#include  
using namespace std;  
using namespace boost;  
class Mutex:public noncopyable{//互斥量的封装  
    public:  
        Mutex(){  
            pthread_mutex_init(&mutex,NULL);  
        }  
        void lock(){  
            pthread_mutex_lock(&mutex);  
        }  
        void unlock(){  
            pthread_mutex_unlock(&mutex);  
        }  
        ~Mutex(){  
            pthread_mutex_destroy(&mutex);  
        }  
        pthread_mutex_t* getMutex(){  
            return &mutex;  
        }  
    private:  
        mutable pthread_mutex_t mutex;  
};  
class MutexLockGuard:noncopyable{//RAII管理互斥量  
    public:  
        explicit MutexLockGuard(Mutex& mutex):mutex_(mutex){  
            mutex_.lock();  
        }  
        ~MutexLockGuard(){  
            mutex_.unlock();  
        }  
    private:  
        Mutex& mutex_;//注意是引用,Mutex继承了noncopyable后不能拷贝构造  
};  
class Condition:public noncopyable{//条件变量的封装  
    public:  
        explicit Condition(Mutex& mutex):mutex_(mutex){  
            pthread_cond_init(&cond,NULL);  
        }  
        ~Condition(){  
            pthread_cond_destroy(&cond);  
        }  
        void wait(){  
            pthread_cond_wait(&cond,mutex_.getMutex());  
        }  
        void notify(){  
            pthread_cond_signal(&cond);  
        }  
        void notifyALL(){  
            pthread_cond_broadcast(&cond);  
        }  
    private:  
        Mutex& mutex_;//注意是引用  
        pthread_cond_t cond;  
};  
template  
class BlockingQueue:noncopyable{  
    public:  
        explicit BlockingQueue(int x):mutex(),full(mutex),empty(mutex),Q(x){}  
        void put(T a){  
            MutexLockGuard guard(mutex);  
            while(Q.full()){//若队列满,则等待空条件empty  
                empty.wait();//等待消费者消费  
            }  
            assert(!Q.full());  
            Q.push_back(a);  
            full.notifyALL();//通知消费者  
        }  
        T take(){  
            MutexLockGuard guard(mutex);  
            while(Q.empty()){//若队列空,则等待满条件full  
                full.wait();//等待生产者生产  
            }  
            assert(!Q.empty());  
            T front(Q.front());  
            front();  
            Q.pop_front();  
            empty.notify();//通知生产者  
            return front;  
        }  
    private:  
        Mutex mutex;  
        Condition full;//满条件  
        Condition empty;//空条件  
        circular_buffer
Q;//boost的循环队列 }; class test{//任务内容 public: explicit test(int x):data(x){} void show(){ cout<<"show "< Functor;//任务T BlockingQueue taskQueue(10); bool running=true;//终止线程标志 void* producer(void* arg){//生产者线程 int i=0; while(running){ //usleep(100); test one(i++); Functor task=bind(&test::show,one);//注意bind会拷贝参数。该处不能用&one. taskQueue.put(task); } } void* customer(void* arg){//消费者线程 while(running){ Functor task=taskQueue.take(); task(); } } int main(){ pthread_t pid0; pthread_t pid[2]; pthread_create(&pid0,NULL,producer,NULL); for(int i=0;i<2;i++){ pthread_create(&pid[i],NULL,customer,NULL); } sleep(1); running=false;//终止线程 pthread_join(pid0,NULL); return 0; }