设为首页 加入收藏

TOP

http线程池的设计与实现(c++)
2015-07-20 17:16:05 来源: 作者: 【 】 浏览:2
Tags:http 线程 池的 设计 实现

http线程池的主要用途是异步处理使用无状态短连接的http请求,在传输层通信基于tcp协议和应用层基于http协议的基础上,达到c++服务器与web服务器通信的目的。

设计上:

(1)服务器启动时,初始化配置数量的线程(形成被动连接线程池)。每个线程会生成epoll描述符。

(2)主线程生成监听socket,绑定端口。生成epoll描述符,注册监听socket,非阻塞接收(限定最大时间,如2s)新连接到连接队列。

(2)投放主线程连接队列中的新连接到被动连接线程池。根据硬哈希选择需求的线程来投放。加入后需要注册连接socket(注册时连接对象作为epoll事件的携带数据)到线程的epoll描述符。

(3)在每个线程的例程里会非阻塞监听epoll描述符上发生的读事件,并解析和处理获取的http请求。

这样每个业务线程可以相对独立的处理无状态的http请求。跟单业务线程的场景不同的是,http线程池的线程之间尽量减少数据共享(实在需要缓存在内存则加锁),每个线程又可以作为客户端短时间阻塞向其他服务器请求数据。

http线程池代码如下:(大致上http线程池的思路可以看得出来。主线程接收连接对象和连接对象接收数据并没有在这里展现实现过程。注意接收时需要忽略EINTR和SIGPIPE信号,如果接收返回-1且错误号为EAGAIN或EWOULDBLOCK,说明接收缓冲区满了,需要继续尝试接收,直到超时。接收返回0则表示对方断开连接,则接收失败。接收成功、失败、超时都需要移除连接对象(epoll描述符注销连接socket、关闭socket、移出和销毁连接对象),因为是短连接)

线程池头文件

/**
 * rief 定义实现轻量级(lightweight)的http服务框架类
 */
class zHttpTaskPool : private zNoncopyable
{
public:
	/**
	 * rief 构造函数
	 */
	zHttpTaskPool()
	{
	}
	/**
	 * rief 析构函数,销毁一个线程池对象
	 *
	 */
	~zHttpTaskPool()
	{
		final();
	}
	bool addHttp(zHttpTask *task);
	bool init();
	void final();	
private:
	static const int maxHttpThreads = 16;					/**< 最大验证线程数量 */
	zThreadGroup httpThreads;						/**< http服务处理线程组 */
};

线程池源文件

?

?

/**
* rief 轻量级http服务的主处理线程
*/
class zHttpThread : public zThread
{
private:
	/**
	 * rief http连接任务链表类型
	 */
	typedef std::list
  
    zHttpTaskContainer;
	/**
	 * rief epoll事件结构向量类型
	 */www.2cto.com
	typedef std::vector
   
     epollfdContainer; zHttpTaskPool *pool; /**< 所属的池 */ zRTime currentTime; /**< 当前时间 */ zMutex mutex; /**< 互斥变量 */ zHttpTaskContainer tasks; /**< 任务列表 */ int kdpfd; epollfdContainer epfds; epollfdContainer::size_type fds_count; public: /** * rief 构造函数 * param pool 所属的连接池 * param name 线程名称 */ zHttpThread( zHttpTaskPool *pool, const std::string &name = std::string(zHttpThread)) : zThread(name), pool(pool), currentTime() { kdpfd = epoll_create(256); assert(-1 != kdpfd); epfds.resize(256); fds_count = 0; } /** * rief 析构函数 */ ~zHttpThread() { TEMP_FAILURE_RETRY(::close(kdpfd)); } void run(); /** * rief 添加一个连接任务 * param task 连接任务 */ void add(zHttpTask *task) { mutex.lock(); task->addEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI, (void *)task); tasks.push_back(task); ++fds_count; if (fds_count > epfds.size()) { epfds.resize(fds_count + 16); } mutex.unlock(); } typedef zHttpTask* zHttpTaskP; void remove(zHttpTaskP &task) { task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI); tasks.remove(task); SAFE_DELETE(task); fds_count--; } void remove(zHttpTaskContainer::iterator &it) { zHttpTask *task = *it; task->delEpoll(kdpfd, EPOLLIN | EPOLLERR | EPOLLPRI); tasks.erase(it); SAFE_DELETE(task); fds_count--; } }; /** * rief http线程例程 */ void zHttpThread::run() { zHttpTaskContainer::iterator it, next; while(!isFinal()) { mutex.lock(); if (!tasks.empty()) { int retcode = epoll_wait(kdpfd, &epfds[0], fds_count, 0); if (retcode > 0) { for(int i = 0; i < retcode; ++i) { zHttpTask *task = (zHttpTask *)epfds[i].data.ptr;//获取epoll事件的数据,即连接对象 if (epfds[i].events & (EPOLLERR | EPOLLPRI))//检查epoll事件是否有错 { //套接口出现错误 remove(task); } else if (epfds[i].events & EPOLLIN)//检查epoll事件是否是读事件 { switch(task->httpCore())//阻塞recv连接对象缓冲区数据 { case 1: //接收成功 case -1: //接收失败 remove(task); break; case 0: //接收超时, break; } } } } currentTime.now(); for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next)//检查短连接的连接对象,移除超时的连接对象 { zHttpTask *task = *it; if (task->checkHttpTimeout(currentTime)) { //超过指定时间验证还没有通过,需要回收连接 remove(it); } } } mutex.unlock(); zThread::msleep(50); } //把所有等待验证队列中的连接加入到回收队列中,回收这些连接 for(it = tasks.begin(), next = it, ++next; it != tasks.end(); it = next, ++next) { remove(it); } } /** * rief 把一个TCP连接添加到验证队列中,因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中 * param task 一个连接任务 */ bool zHttpTaskPool::addHttp(zHttpTask *task) { //因为存在多个验证队列,需要按照一定的算法添加到不同的验证处理队列中 static unsigned int hashcode = 0; zHttpThread *pHttpThread = (zHttpThread *)httpThreads.getByIndex(hashcode++ % maxHttpThreads); if (pHttpThread) pHttpThread->add(task); return true; } /** * rief 初始化线程池,预先创建各种线程 * eturn 初始化是否成功 */ bool zHttpTaskPool::init() { //创建初始化验证线程 for(int i = 0; i < maxHttpThreads; ++i) { std::ostringstream name; name << zHttpThread[ << i << ]; zHttpThread *pHttpThread = new zHttpThread(this, name.str()); if (NULL == pHttpThread) return false; if (!pHttpThread->start()) return false; httpThreads.add(pHttpThread); } return true; } /** * rief 释放线程池,释放各种资源,等待各种线程退出 */ void zHttpTaskPool::final() { httpThreads.joinAll(); }
   
  


?

?

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇1015. Reversible Primes (20) 下一篇1014. Waiting in Line (30)

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·Redis on AWS:Elast (2025-12-27 04:19:30)
·在 Spring Boot 项目 (2025-12-27 04:19:27)
·使用华为开发者空间 (2025-12-27 04:19:24)
·Getting Started wit (2025-12-27 03:49:24)
·Ubuntu 上最好用的中 (2025-12-27 03:49:20)