#include "stdafx.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
//阻塞式队列
template
{
public:
CBlockingDeque(int maxcount):m_maxcount(maxcount)
{
//
}
bool push_back(T &t)
{
bool result = false;
std::unique_lock
if(m_deque.size() < this->m_maxcount)
{
m_deque.push_back(t);
m_cond.notify_all();
result = true;
}
return result;
}
bool pop_front(T &t,int milliseconds)
{
if(milliseconds <= 0)
{
milliseconds = 10;
}
bool result = false;
std::unique_lock
if(m_deque.empty())
{
std::cv_status::cv_status cv = std::cv_status::no_timeout;
cv = m_cond.wait_for(lock,std::chrono::milliseconds(milliseconds));
if(std::cv_status::no_timeout == cv)
{
//
}
}
if(!m_deque.empty())
{
t = m_deque.front();
m_deque.pop_front();
result = true;
}
return result;
}
//
size_t size()
{
std::lock_guard
return m_deque.size();
}
private:
//最大容量
const size_t m_maxcount;
//互斥锁
std::mutex m_mutex;
//std::unique_lock
//条件变量
std::condition_variable m_cond;
//标准队列
std::deque
};
class CTestBlockingDeque
{
public:
void push_back(CBlockingDeque
{
size_t k = 0;
for(size_t i = 0;i < total;++i)
{
std::stringstream str;
str<<"ITEM"< std::string s = str.str();
if(bd->push_back(s))
{
k++;
}
}
std::cout<
int pop_front(CBlockingDeque
{
int count = 0;
int failcount = 0;
while(failcount < 1)
{
std::string s = "";
if(bd->pop_front(s,milliseconds))
{
count++;
}
else
{
failcount++;
}
}
std::cout<
}
static void start_test()
{
const int totalitem = 100000;
CBlockingDeque
CTestBlockingDeque test_bd;
//构造3个插入线程
auto fnarg = std::bind(&CTestBlockingDeque::push_back,&test_bd,&bd,std::placeholders::_1);
std::thread push_task_1(fnarg,50000);
std::thread push_task_2(fnarg,40000);
std::thread push_task_3(fnarg,30000);
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
//构造2个输出线程
auto fnarg_1 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,500);
auto fnarg_2 = std::bind(&CTestBlockingDeque::pop_front,&test_bd,&bd,100);
std::packaged_task
std::packaged_task
std::future
std::future
std::thread pop_task_1(std::move(pos_packaged_1),nullptr,0);
std::thread pop_task_2(std::move(pos_packaged_2),nullptr,0);
std::thread::id id_1 = pop_task_1.get_id();
std::thread::id id_2 = pop_task_2.get_id();
//等待线程都正常退出
push_task_1.join();
push_task_2.join();
push_task_3.join();
pop_task_1.join();
pop_task_2.join();
int total_1 = future_1.get();
int total_2 = future_2.get();
std::cout<
};
int _tmain(int argc, _TCHAR* argv[])
{
CTestBlockingDeque::start_test();
system("pause");
return 0;
}
VS2012 Update2 下运行结果:
75288 pop_front fail total:1
74000 push success total:30000
73720 push success total:40000
74216 push success total:50000
75228 pop_front fail total:1
75288 pop total:3837
75228 pop total:116163
pop total:120000