6.6.2 常见的并发网络服务程序设计方案(5)
如果要改成聊天服务,重新定义handle_input 函数即可,程序的其余部分保持不变。
- $ diff echo-reactor.py chat-reactor.py -U1
- def handle_input(socket, data):
- - socket.send(data) # sendall() partial
- + for (fd, other_socket) in connections.iteritems():
- + if other_socket != socket:
- + other_socket.send(data) # sendall() partial
必须说明的是,完善的非阻塞IO 网络库远比上面的玩具代码复杂,需要考虑各种错误场景。特别是要真正接管数据的收发,而不是像上面的示例那样直接在事件处理回调函数中发送网络数据。
注意在使用非阻塞IO +事件驱动方式编程(www.cppentry.com)的时候,一定要注意避免在事件回调中执行耗时的操作,包括阻塞IO 等,否则会影响程序的响应。这和Windows GUI消息循环非常类似。
方案6 这是一个过渡方案,收到Sudoku 请求之后,不在Reactor 线程计算,而是创建一个新线程去计算,以充分利用多核CPU。这是非常初级的多线程应用,因为它为每个请求(而不是每个连接)创建了一个新线程。这个开销可以用线程池来避免,即方案8。这个方案还有一个特点是out-of-order,即同时创建多个线程去计算同一个连接上收到的多个请求,那么算出结果的次序是不确定的,可能第2 个Sudoku 比较简单,比第1 个先算出结果。这也是我们在一开始设计协议的时候使用了id 的原因,以便客户端区分response 对应的是哪个request。
方案7 为了让返回结果的顺序确定,我们可以为每个连接创建一个计算线程,每个连接上的请求固定发给同一个线程去算,先到先得。这也是一个过渡方案,因为并发连接数受限于线程数目,这个方案或许还不如直接使用阻塞IO 的thread-per-connection 方案2。
方案7 与方案6 的另外一个区别是单个client 的最大CPU 占用率。在方案6 中,一个TCP 连接上发来的一长串突发请求(burst requests)可以占满全部8 个core;而在方案7 中,由于每个连接上的请求固定由同一个线程处理,那么它最多占用12.5%的CPU 资源。这两种方案各有优劣,取决于应用场景的需要(到底是公平性重要还是突发性能重要)。这个区别在方案8 和方案9 中同样存在,需要根据应用来取舍。
方案8 为了弥补方案6 中为每个请求创建线程的缺陷,我们使用固定大小线程池,程序结构如图6-12 所示。全部的IO 工作都在一个Reactor 线程完成,而计算任务交给thread pool。如果计算任务彼此独立,而且IO 的压力不大,那么这种方案是非常适用的。Sudoku Solver 正好符合。代码参见:xamples/sudoku/server_threadpool.cc。
|
| 图6-12 |
方案8 使用线程池的代码与单线程Reactor 的方案5 相比变化不大,只是把原来onMessage() 中涉及计算和发回响应的部分抽出来做成一个函数,然后交给ThreadPool 去计算。记住方案8 有乱序返回的可能,客户端要根据id 来匹配响应。- $ diff server_basic.cc server_threadpool.cc -u
- --- server_basic.cc 2012-04-20 20:19:56.000000000 +0800
- +++ server_threadpool.cc 2012-06-10 22:15:02.000000000 +0800
- @@ -96,16 +100,7 @@ void onMessage(const TcpConnectionPtr& conn, ...
- if (puzzle.size() == implicit_cast<size_t>(kCells))
- {
- - string result = solveSudoku(puzzle);
- - if (id.empty())
- - {
- - conn->send(result+"\r\n");
- - }
- - else
- - {
- - conn->send(id+":"+result+"\r\n");
- - }
- + threadPool_.run(boost::bind(&solve, conn, puzzle, id));
- }
- @@ -114,17 +109,40 @@
- + static void solve(const TcpConnectionPtr& conn,
- + const string& puzzle,
- + const string& id)
- + {
- + string result = solveSudoku(puzzle);
- + if (id.empty())
- + {
- + conn->send(result+"\r\n");
- + }
- + else
- + {
- + conn->send(id+":"+result+"\r\n");
- + }
- + }
- +
- EventLoop* loop_;
- TcpServer server_;
- + ThreadPool threadPool_;
- Timestamp startTime_;
- };