2.3.3 NIO创建的TimeServer源码分析(1)
我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码。
代码清单2-8 NIO时间服务器 TimeServer
- 9. public class TimeServer {
- 10.
- 11. /**
- 12. * @param args
- 13. * @throws IOException
- 14. */
- 15. public static void main(String[] args) throws IOException {
- 16. int port = 8080;
- 17. if (args != null && args.length > 0) {
- 18. try {
- 19. port = Integer.valueOf(args[0]);
- 20. } catch (NumberFormatException e) {
- 21. // 采用默认值
- 22. }
- 23. }
- 24. MultiplexerTimeServer timeServer = new MultiplexerTimeServer (port);
- 25. New Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
- 26. }
- 27. }
我们对NIO创建的TimeServer进行简单分析下,16~23行跟之前的一样,设置监听端口。24~25行创建了一个被称为MultiplexerTimeServer的多路复用类,它是个一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入。现在我们继续看MultiplexerTimeServer的源码。
代码清单2-8 NIO时间服务器 MultiplexerTimeServer
- 17. public class MultiplexerTimeServer implements Runnable {
- 18.
- 19. private Selector selector;
- 20.
- 21. private ServerSocketChannel servChannel;
- 22.
- 23. private volatile boolean stop;
- 24.
- 25. /**
- 26. * 初始化多路复用器、绑定监听端口
- 27. *
- 28. * @param port
- 29. */
- 30. public MultiplexerTimeServer(int port) {
- 31. try {
- 32. selector = Selector.open();
- 33. servChannel = ServerSocketChannel.open();
- 34. servChannel.configureBlocking(false);
- 35. servChannel.socket().bind(new InetSocketAddress(port), 1024);
- 36. servChannel.register(selector, SelectionKey.OP_ACCEPT);
- 37. System.out.println("The time server is start in port : " + port);
- 38. } catch (IOException e) {
- 39. e.printStackTrace();
- 40. System.exit(1);
- 41. }
- 42. }
- 43.
- 44. public void stop() {
- 45. this.stop = true;
- 46. }
- 47.
- 48. /*
- 49. * (non-Javadoc)
- 50. *
- 51. * @see java.lang.Runnable#run()
- 52. */
- 53. @Override
- 54. public void run() {
- 55. while (!stop) {
- 56. try {
- 57. selector.select(1000);
- 58. Set<SelectionKey> selectedKeys = selector.selectedKeys();
- 59. Iterator<SelectionKey> it = selectedKeys.iterator();
- 60. SelectionKey key = null;
- 61. while (it.hasNext()) {
- 62. key = it.next();
- 63. it.remove();
- 64. try {
- 65. handleInput(key);
- 66. } catch (Exception e) {
- 67. if (key != null) {
- 68. key.cancel();
- 69. if (key.channel() != null)
- 70. key.channel().close();
- 71. }
- 72. }
- 73. }
- 74. } catch (Throwable t) {
- 75. t.printStackTrace();
- 76. }
- 77. }
- 78.
- 79. // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
- 80. if (selector != null)
- 81. try {
- 82. selector.close();
- 83. } catch (IOException e) {
- 84. e.printStackTrace();
- 85. }
- 86. }
- 87.
- 88. private void handleInput(SelectionKey key) throws IOException {
- 89.
- 90. if (key.isValid()) {
- 91. // 处理新接入的请求消息
- 92. if (key.isAcceptable()) {
- 93. // Accept the new connection
- 94. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- 95. SocketChannel sc = ssc.accept();
- 96. sc.configureBlocking(false);
- 97. // Add the new connection to the selector
- 98. sc.register(selector, SelectionKey.OP_READ);
- 99. }
- 100. if (key.isReadable()) {
- 101. // Read the data
- 102. SocketChannel sc = (SocketChannel) key.channel();
- 103. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- 104. int readBytes = sc.read(readBuffer);
- 105. if (readBytes > 0) {
- 106. readBuffer.flip();
- 107. byte[] bytes = new byte[readBuffer.remaining()];
- 108. readBuffer.get(bytes);
- 109. String body = new String(bytes, "UTF-8");
- 110. System.out.println("The time server receive order : "
- 111. + body);
- 112. String currentTime = "QUERY TIME ORDER"
- 113. .equalsIgnoreCase(body) new java.util.Date(
- 114. System.currentTimeMillis()).toString()
- 115. : "BAD ORDER";
- 116. doWrite(sc, currentTime);
- 117. } else if (readBytes < 0) {
- 118. // 对端链路关闭
- 119. key.cancel();
- 120. sc.close();
- 121. } else
- 122. ; // 读到0字节,忽略
- 123. }
- 124. }
- 125. }
- 126.
- 127. private void doWrite(SocketChannel channel, String response)
- 128. throws IOException {
- 129. if (response != null && response.trim().length() > 0) {
- 130. byte[] bytes = response.getBytes();
- 131. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
- 132. writeBuffer.put(bytes);
- 133. writeBuffer.flip();
- 134. channel.write(writeBuffer);
- 135. }
- 136. }
- 137. }
喜欢的朋友可以添加我们的微信账号:
51CTO读书频道二维码

51CTO读书频道活动讨论群:342347198