本文共 8123 字,大约阅读时间需要 27 分钟。
JDK7 虽然已经发布了一段时间了,但是无奈,AIO相关介绍,尤其是靠谱儿的介绍实在是太少了。兄弟花了些时间,整理成册,希望对learner有些帮助。
epoll成为Linux下开发大并发web 服务器的首选已经好多年了,java世界里,直到JDK 7的AIO出现才用上了这个feature。哎!不过亡羊补牢,为时未晚,下面就看下用AIO开发一个简单的TCP Server和TCP Client。
1. 代码结构如下一共由6个文件组成
2. Demo测试效果:
3. TCP Server 由三个文件组成。
AioTcpServer是主文件
AioAcceptHandler负责接收连接,采用递归模型
AioReadHandler负责接收客户端数据,仍然是异步方式
3.1 AioTcpServer.java
package server;
import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AioTcpServer implements Runnable { private AsynchronousChannelGroup asyncChannelGroup; private AsynchronousServerSocketChannel listener; public AioTcpServer(int port) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(20); asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(port)); } public void run() { try { AioAcceptHandler acceptHandler = new AioAcceptHandler(); listener.accept(listener, new AioAcceptHandler()); Thread.sleep(400000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("finished server"); } } public static void main(String... args) throws Exception { AioTcpServer server = new AioTcpServer(9008); new Thread(server).start(); } }
3.2 AioAcceptHandler.java
package server;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class AioAcceptHandler implements CompletionHandler { public void cancelled(AsynchronousServerSocketChannel attachment) { System.out.println("cancelled"); } public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel attachment) { try { System.out.println("AioAcceptHandler.completed called"); attachment.accept(attachment, this); System.out.println("有客户端连接:" + socket.getRemoteAddress().toString()); startRead(socket); } catch (IOException e) { e.printStackTrace(); } } public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { exc.printStackTrace(); } public void startRead(AsynchronousSocketChannel socket) { ByteBuffer clientBuffer = ByteBuffer.allocate(1024); socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); try { } catch (Exception e) { e.printStackTrace(); } } }
3.3 AioReadHandler.java
package server;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class AioReadHandler implements CompletionHandler { private AsynchronousSocketChannel socket; public AioReadHandler(AsynchronousSocketChannel socket) { this.socket = socket; } public void cancelled(ByteBuffer attachment) { System.out.println("cancelled"); } private CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); public void completed(Integer i, ByteBuffer buf) { if (i > 0) { buf.flip(); try { System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); buf.compact(); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } socket.read(buf, buf, this); } else if (i == -1) { try { System.out.println("客户端断线:" + socket.getRemoteAddress().toString());