设为首页 加入收藏

TOP

2.4.1 AIO创建的TimeServer源码分析(2)
2014-07-09 13:01:23 来源: 作者: 【 】 浏览:157
Tags:2.4.1 AIO 创建 TimeServer 源码 分析

2.4.1  AIO创建的TimeServer源码分析(2)

链路建立成功之后,服务端需要接收客户端的请求消息,在代码第19行我们创建新的ByteBuffer,预分配1M的缓冲区。第20行我们通过调用AsynchronousSocketChannel的read方法进行异步读操作。下面我们看看异步read方法的参数。

ByteBuffer dst:接收缓冲区,用于从异步Channel中读取数据包;

A attachment:异步Channel携带的附件,通知回调的时候作为入参使用;

CompletionHandler<Integer, super A>:接收通知回调的业务handler,本例程中为ReadCompletionHandler。

下面我们继续对ReadCompletionHandler进行分析。

代码清单2-14  AIO时间服务器服务端  ReadCompletionHandler
 

  1. 8.    
  2. 9.  /**  
  3. 10.  * @author lilinfeng  
  4. 11.  * @date 2014年2月16日  
  5. 12.  * @version 1.0  
  6. 13.  */  
  7. 14. public class ReadCompletionHandler implements  
  8. 15.     CompletionHandler<Integer, ByteBuffer> {  
  9. 16.   
  10. 17.     private AsynchronousSocketChannel channel;  
  11. 18.   
  12. 19.     public ReadCompletionHandler(AsynchronousSocketChannel channel) {  
  13. 20.     if (this.channel == null)  
  14. 21.         this.channel = channel;  
  15. 22.     }  
  16. 23.   
  17. 24.     @Override  
  18. 25.     public void completed(Integer result, ByteBuffer attachment) {  
  19. 26.     attachment.flip();  
  20. 27.     byte[] body = new byte[attachment.remaining()];  
  21. 28.     attachment.get(body);  
  22. 29.     try {  
  23. 30.         String req = new String(body, "UTF-8");  
  24. 31.         System.out.println("The time server receive order : " + req);  
  25. 32.         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req)   new java.util.Date(  
  26. 33.             System.currentTimeMillis()).toString() : "BAD ORDER";  
  27. 34.         doWrite(currentTime);  
  28. 35.     } catch (UnsupportedEncodingException e) {  
  29. 36.         e.printStackTrace();  
  30. 37.     }  
  31. 38.     }  
  32. 39.   
  33. 40.     private void doWrite(String currentTime) {  
  34. 41.     if (currentTime != null && currentTime.trim().length() > 0) {  
  35. 42.         byte[] bytes = (currentTime).getBytes();  
  36. 43.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
  37. 44.         writeBuffer.put(bytes);  
  38. 45.         writeBuffer.flip();  
  39. 46.         channel.write(writeBuffer, writeBuffer,  
  40. 47.             new CompletionHandler<Integer, ByteBuffer>() {  
  41. 48.             @Override  
  42. 49.             public void completed(Integer result, ByteBuffer buffer) {  
  43. 50.                 // 如果没有发送完成,继续发送  
  44. 51.                 if (buffer.hasRemaining())  
  45. 52.                 channel.write(buffer, buffer, this);  
  46. 53.             }  
  47. 54.   
  48. 55.             @Override  
  49. 56.             public void failed(Throwable exc, ByteBuffer attachment) {  
  50. 57.                 try {  
  51. 58.                 channel.close();  
  52. 59.                 } catch (IOException e) {  
  53. 60.                 // ingnore on close  
  54. 61.                 }  
  55. 62.             }  
  56. 63.             });  
  57. 64.     }  
  58. 65.     }  
  59. 66.   
  60. 67.     @Override  
  61. 68.     public void failed(Throwable exc, ByteBuffer attachment) {  
  62. 69.     try {  
  63. 70.         this.channel.close();  
  64. 71.     } catch (IOException e) {  
  65. 72.         e.printStackTrace();  
  66. 73.     }  
  67. 74.     }  
  68. 75. } 

首先看构造方法,我们将AsynchronousSocketChannel通过参数传递到ReadCompletion Handler中当作成员变量来使用,主要用于读取半包消息和发送应答。本例程不对半包读写进行具体说明,对此感兴趣的读者可以关注后续章节对Netty半包处理的专题介绍。我们继续看代码,第25~38行是读取到消息后的处理,首先对attachment进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数组,然后通过new String方法创建请求消息,对请求消息进行判断,如果是"QUERY TIME ORDER"则获取当前系统服务器的时间,调用doWrite方法发送给客户端。下面我们对doWrite方法进行详细分析。

跳到代码第41行,首先对当前时间进行合法性校验,如果合法,调用字符串的解码方法将应答消息编码成字节数组,然后将它复制到发送缓冲区writeBuffer中,最后调用AsynchronousSocketChannel的异步write方法。正如前面介绍的异步read方法一样,它也有三个与read方法相同的参数,在本例程中我们直接实现write方法的异步回调接口CompletionHandler。代码跳到第51行,对发送的writeBuffer进行判断,如果还有剩余的字节可写,说明没有发送完成,需要继续发送,直到发送成功。

最后,我们关注下failed方法,它的实现很简单,就是当发生异常的时候,对异常Throwable进行判断,如果是I/O异常,就关闭链路,释放资源,如果是其他异常,按照业务自己的逻辑进行处理。本例程作为简单demo,没有对异常进行分类判断,只要发生了读写异常,就关闭链路,释放资源。

异步非阻塞I/O版本的时间服务器服务端已经介绍完毕,下面我们继续看客户端的实现。
 

喜欢的朋友可以添加我们的微信账号:

51CTO读书频道二维码


51CTO读书频道活动讨论群:342347198

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇C语言程序编译过程 下一篇2.4.2 AIO创建的TimeClient源码分..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·C语言指针从入门到基 (2025-12-26 05:21:36)
·【C语言指针初阶】C (2025-12-26 05:21:33)
·C语言指针的定义和使 (2025-12-26 05:21:31)
·在 Redis 中如何查看 (2025-12-26 03:19:03)
·Redis在实际应用中, (2025-12-26 03:19:01)