.6.2 常见的并发网络服务程序设计方案(4)
Doug Schmidt 指出,其实网络编程(www.cppentry.com)中有很多是事务性(routine)的工作,可以提取为公用的框架或库,而用户只需要填上关键的业务逻辑代码,并将回调注册到框架中,就可以实现完整的网络服务,这正是Reactor 模式的主要思想。
如果用传统Windows GUI 消息循环来做一个类比,那么我们前面展示IO multiplexing的做法相当于把程序的全部逻辑都放到了窗口过程(WndProc)的一个巨大的switch-case 语句中,这种做法无疑是不利于扩展的。(各种GUI 框架在此各显神通。)
- 1 LRESULT CALLBACK WndProc(HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)
- 2 {
- 3 switch (message)
- 4 {
- 5 case WM_DESTROY:
- 6 PostQuitMessage(0);
- 7 return 0;
- 8 // many more cases
- 9 }
- 10 return DefWindowProc (hwnd, message, wParam, lParam) ;
- 11 }
而Reactor 的意义在于将消息(IO 事件)分发到用户提供的处理函数,并保持网络部分的通用代码不变,独立于用户的业务逻辑。
单线程Reactor 的程序执行顺序如图6-11 (左图)所示。在没有事件的时候,线程等待在select/poll/epoll_wait 等函数上。事件到达后由网络库处理IO,再把消息通知(回调)客户端代码。Reactor 事件循环所在的线程通常叫IO 线程。通常由网络库负责读写socket,用户代码负载解码、计算、编码。
注意由于只有一个线程,因此事件是顺序处理的,一个线程同时只能做一件事情。在这种协作式多任务中,事件的优先级得不到保证,因为从“poll 返回之后”到“下一次调用poll 进入等待之前”这段时间内,线程不会被其他连接上的数据或事件抢占(见图6-11 的右图)。如果我们想要延迟计算(把compute() 推迟100ms),那么也不能用sleep() 之类的阻塞调用,而应该注册超时回调,以避免阻塞当前IO 线程。
|
| (点击查看大图)图6-11 |
方案5 基本的单线程Reactor 方案(见图6-11),即前面的server_basic.cc 程序。本文以它作为对比其他方案的基准点。这种方案的优点是由网络库搞定数据收发,程序只关心业务逻辑;缺点在前面已经谈了:适合IO 密集的应用,不太适合CPU 密集的应用,因为较难发挥多核的威力。另外,与方案2 相比,方案5 处理网络消息的延迟可能要略大一些,因为方案2 直接一次read(2) 系统调用就能拿到请求数据,而方案5 要先poll(2) 再read(2),多了一次系统调用。
这里用一小段Python 代码展示Reactor 模式的雏形。为了节省篇幅,这里直接使用了全局变量,也没有处理异常。程序的核心仍然是事件循环(L42~L46),与前面不同的是,事件的处理通过handlers 转发到各个函数中,不再集中在一坨。例如listening fd 的处理函数是handle_accept,它会注册客户连接的handler。普通客户连接的处理函数是handle_request,其中又把连接断开和数据到达这两个事件分开,后者由handle_input 处理。业务逻辑位于单独的handle_input 函数,实现了分离。
- recipes/python/echo-reactor.py
- 6 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- 7 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- 8 server_socket.bind(('', 2007))
- 9 server_socket.listen(5)
- 10 # serversocket.setblocking(0)
- 11
- 12 poll = select.poll() # epoll() should work the same
- 13 connections = {}
- 14 handlers = {}
- 15
- 16 def handle_input(socket, data):
- 17 socket.send(data) # sendall() partial
- 18
- 19 def handle_request(fileno, event):
- 20 if event & select.POLLIN:
- 21 client_socket = connections[fileno]
- 22 data = client_socket.recv(4096)
- 23 if data:
- 24 handle_input(client_socket, data)
- 25 else:
- 26 poll.unregister(fileno)
- 27 client_socket.close()
- 28 del connections[fileno]
- 29 del handlers[fileno]
- 30
- 31 def handle_accept(fileno, event):
- 32 (client_socket, client_address) = server_socket.accept()
- 33 print "got connection from", client_address
- 34 # client_socket.setblocking(0)
- 35 poll.register(client_socket.fileno(), select.POLLIN)
- 36 connections[client_socket.fileno()] = client_socket
- 37 handlers[client_socket.fileno()] = handle_request
- 38
- 39 poll.register(server_socket.fileno(), select.POLLIN)
- 40 handlers[server_socket.fileno()] = handle_accept
- 41
- 42 while True:
- 43 events = poll.poll(10000) # 10 seconds
- 44 for fileno, event in events:
- 45 handler = handlers[fileno]
- 46 handler(fileno, event)
- recipes/python/echo-reactor.py