异步之生产者消费者模型--同步缓冲区的实现

2014-11-24 02:00:41 · 作者: · 浏览: 1
适用于多个生产线程和多个消费线程之间的协作,生产者将信息放入同步缓冲区,消费者从该缓冲区中读取进行操作,可以指定特殊的“产品”来指示工作线程退出。
#ifndef __PRODUCER_CONSUMER_HPP_  
#define __PRODUCER_CONSUMER_HPP_  
#include   
class NonCopyable{  
protected:  
    NonCopyable(){}  
    ~NonCopyable(){}  
private:  
    NonCopyable(const NonCopyable&);  
    const NonCopyable& operator=(const NonCopyable&);  
};  
  
template  
class SyncBuffer  
    : public NonCopyable  
{  
public:  
    typedef typename T SyncData;  
  
    SyncBuffer(long max_size = 512)  
        : max_size_(max_size){  
        InitializeCriticalSectionAndSpinCount(&cs_, 4000);  
        empty_slot_event_ = CreateSemaphore(NULL, max_size_, max_size_, "empty_event");  
        full_slot_event_ = CreateSemaphore(NULL, 0, max_size_, "full_event");  
    }  
  
    ~SyncBuffer(){  
        DeleteCriticalSection(&cs_);  
        CloseHandle(empty_slot_event_);  
        CloseHandle(full_slot_event_);  
    }  
  
    void push_front(const SyncData& data){  
        get_cs_for_add();  
        buffer_.push_back(data);  
        get_cs_for_add(false);  
    }  
  
    void push_back(const SyncData& data){  
        get_cs_for_add();  
        buffer_.push_back(data);  
        get_cs_for_add(false);  
    }  
  
    void pop_front(SyncData& data){  
        get_cs_for_remove();  
        data = buffer_.front();  
        buffer_.pop_front();  
        get_cs_for_remove(false);  
    }  
  
    void pop_back(SyncData& data){  
        get_cs_for_remove();  
        data = buffer_.back();  
        buffer_.pop_back();  
        get_cs_for_remove(false);  
    }  
private:  
    void get_cs_for_add(bool isget = true){  
        if(isget){  
            WaitForSingleObject(empty_slot_event_, INFINITE);  
            EnterCriticalSection(&cs_);  
        }else{  
            LeaveCriticalSection(&cs_);  
            ReleaseSemaphore(full_slot_event_, 1, NULL);  
        }  
    }  
  
    void get_cs_for_remove(bool isget = true){  
        if(isget){  
            WaitForSingleObject(full_slot_event_, INFINITE);  
            EnterCriticalSection(&cs_);  
        }else{  
            LeaveCriticalSection(&cs_);  
            ReleaseSemaphore(empty_slot_event_, 1, NULL);  
        }  
    }  
  
    std::deque
buffer_; long max_size_; HANDLE empty_slot_event_; HANDLE full_slot_event_; CRITICAL_SECTION cs_; }; #endif // __PRODUCER_CONSUMER_HPP_