2.4.2 AIO创建的TimeClient源码分析(1)
首先看下客户端主函数的实现。
代码清单2-15 AIO时间服务器客户端 TimeClient
- 16. try {
- 17. port = Integer.valueOf(args[0]);
- 18. } catch (NumberFormatException e) {
- 19. // 采用默认值
- 20. }
- 21. }
- 22. new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
- 23. "AIO-AsyncTimeClientHandler-001").start();
- 24.
- 25. }
- 26. }
第22行我们通过一个独立的I/O线程创建异步时间服务器客户端handler,在实际项目中,我们不需要独立的线程创建异步连接对象,因为底层都是通过JDK的系统回调实现的,在后面运行时间服务器程序的时候,我们会抓取线程调用堆栈给大家展示。
继续看代码,AsyncTimeClientHandler的实现类源码如下。
代码清单2-16 AIO时间服务器客户端AsyncTimeClientHandler
- 1. package com.phei.netty.aio;
- 2.
- 3. import java.io.IOException;
- 4. import java.io.UnsupportedEncodingException;
- 5. import java.net.InetSocketAddress;
- 6. import java.nio.ByteBuffer;
- 7. import java.nio.channels.AsynchronousSocketChannel;
- 8. import java.nio.channels.CompletionHandler;
- 9. import java.util.concurrent.CountDownLatch;
- 10.
- 11. /**
- 12. * @author Administrator
- 13. * @date 2014年2月16日
- 14. * @version 1.0
- 15. */
- 16. public class AsyncTimeClientHandler implements
- 17. CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
- 18.
- 19. private AsynchronousSocketChannel client;
- 20. private String host;
- 21. private int port;
- 22. private CountDownLatch latch;
- 23.
- 24. public AsyncTimeClientHandler(String host, int port) {
- 25. this.host = host;
- 26. this.port = port;
- 27. try {
- 28. client = AsynchronousSocketChannel.open();
- 29. } catch (IOException e) {
- 30. e.printStackTrace();
- 31. }
- 32. }
- 33.
- 34. @Override
- 35. public void run() {
- 36. latch = new CountDownLatch(1);
- 37. client.connect(new InetSocketAddress(host, port), this, this);
- 38. try {
- 39. latch.await();
- 40. } catch (InterruptedException e1) {
- 41. e1.printStackTrace();
- 42. }
- 43. try {
- 44. client.close();
- 45. } catch (IOException e) {
- 46. e.printStackTrace();
- 47. }
- 48. }
- 49.
- 50. @Override
- 51. public void completed(Void result, AsyncTimeClientHandler attachment) {
- 52. byte[] req = "QUERY TIME ORDER".getBytes();
- 53. ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
- 54. writeBuffer.put(req);
- 55. writeBuffer.flip();
- 56. client.write(writeBuffer, writeBuffer,
- 57. new CompletionHandler<Integer, ByteBuffer>() {
- 58. @Override
- 59. public void completed(Integer result, ByteBuffer buffer) {
- 60. if (buffer.hasRemaining()) {
- 61. client.write(buffer, buffer, this);
- 62. } else {
- 63. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- 64. client.read(
- 65. readBuffer,
- 66. readBuffer,
- 67. new CompletionHandler<Integer, ByteBuffer>() {
- 68. @Override
- 69. public void completed(Integer result,
- 70. ByteBuffer buffer) {
- 71. buffer.flip();
- 72. byte[] bytes = new byte[buffer
- 73. .remaining()];
- 74. buffer.get(bytes);
- 75. String body;
- 76. try {
- 77. body = new String(bytes,
- 78. "UTF-8");
- 79. System.out.println("Now is : "
- 80. + body);
- 81. latch.countDown();
- 82. } catch (UnsupportedEncodingException e) {
- 83. e.printStackTrace();
- 84. }
- 85. }
- 86.
- 87. @Override
- 88. public void failed(Throwable exc,
- 89. ByteBuffer attachment) {
- 90. try {
- 91. client.close();
- 92. latch.countDown();
- 93. } catch (IOException e) {
- 94. // ingnore on close
- 95. }
- 96. }
- 97. });
- 98. }
- 99. }
- 100.
- 101. @Override
- 102. public void failed(Throwable exc,ByteBuffer attachment) {
- 103. try {
- 104. client.close();
- 105. latch.countDown();
- 106. } catch (IOException e) {
- 107. // ingnore on close
- 108. }
- 109. }
- 110. });
- 111. }
- 112.
- 113. @Override
- 114. public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
- 115. exc.printStackTrace();
- 116. try {
- 117. client.close();
- 118. latch.countDown();
- 119. } catch (IOException e) {
- 120. e.printStackTrace();
- 121. }
- 122. }
- 123. }
喜欢的朋友可以添加我们的微信账号:
51CTO读书频道二维码

51CTO读书频道活动讨论群:342347198