C++ 2011 多线程阻塞队列

2014-11-24 01:24:09 · 作者: · 浏览: 2

#include "stdafx.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

using namespace std;

//阻塞式队列
template class CBlockingDeque
{
public:
CBlockingDeque(int maxcount):m_maxcount(maxcount)
{
//
}

bool push_back(T &t)
{
bool result = false;
std::unique_lock lock(m_mutex);
if(m_deque.size() < this->m_maxcount)
{
m_deque.push_back(t);
m_cond.notify_all();
result = true;
}
return result;
}

bool pop_front(T &t,int milliseconds)
{
if(milliseconds <= 0)
{
milliseconds = 10;
}

bool result = false;
std::unique_lock lock(m_mutex);
if(m_deque.empty())
{
std::cv_status::cv_status cv = std::cv_status::no_timeout;
cv = m_cond.wait_for(lock,std::chrono::milliseconds(milliseconds));
if(std::cv_status::no_timeout == cv)
{
//
}
}

if(!m_deque.empty())
{
t = m_deque.front();
m_deque.pop_front();
result = true;
}

return result;
}

//
size_t size()
{
std::lock_guard m(this->m_mutex);
return m_deque.size();
}
private:
//最大容量
const size_t m_maxcount;
//互斥锁
std::mutex m_mutex;
//std::unique_lock lock(m,std::defer_lock);
//条件变量
std::condition_variable m_cond;
//标准队列
std::deque m_deque;
};

class CTestBlockingDeque
{
public:
void push_back(CBlockingDeque *bd,int total)
{
size_t k = 0;
for(size_t i = 0;i < total;++i)
{
std::stringstream str;
str<<"ITEM"< std::string s = str.str();
if(bd->push_back(s))
{
k++;
}
}
std::cout< }

int pop_front(CBlockingDeque *bd,int milliseconds)
{
int count = 0;
int failcount = 0;
while(failcount < 1)
{
std::string s = "";
if(bd->pop_front(s,milliseconds))
{
count++;
}
else
{
failcount++;
}
}
std::cout< return count;
}


static void start_test()
{
const int totalitem = 100000;
CBlockingDeque bd(totalitem);
CTestBlockingDeque test_bd;

//构造3个插入线程
auto fnarg = std::bind(&CTestBlockingDeque::push_back,&test_bd,&bd,std::placeholders::_1);
std::thread push_task_1(fnarg,50000);
std::thread push_task_2(fnarg,40000);
std::thread push_task_3(fnarg,30000);

//std::this_thread::sleep_for(std::chrono::milliseconds(100));

//构造2个输出线程
auto fnarg_1 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,500);
auto fnarg_2 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,100);
std::packaged_task *,int)> pos_packaged_1(fnarg_1);
std::packaged_task *,int)> pos_packaged_2(fnarg_2);
std::future future_1 = pos_packaged_1.get_future();
std::future future_2 = pos_packaged_2.get_future();
std::thread pop_task_1(std::move(pos_packaged_1),nullptr,0);
std::thread pop_task_2(std::move(pos_packaged_2),nullptr,0);

std::thread::id id_1 = pop_task_1.get_id();
std::thread::id id_2 = pop_task_2.get_id();

//等待线程都正常退出
push_task_1.join();
push_task_2.join();
push_task_3.join();
pop_task_1.join();
pop_task_2.join();

int total_1 = future_1.get();
int total_2 = future_2.get();

std::cout< std::cout< std::cout<<"pop total:"< }
};

int _tmain(int argc, _TCHAR* argv[])
{

CTestBlockingDeque::start_test();
system("pause");
return 0;
}

VS2012 Update2 下运行结果:

75288 pop_front fail total:1
74000 push success total:30000
73720 push success total:40000
74216 push success total:50000
75228 pop_front fail total:1
75288 pop total:3837
75228 pop total:116163
pop total:120000