博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JDK7 AIO (非阻塞IO)实现大并发TCPServer和TCPClient
阅读量:2394 次
发布时间:2019-05-10

本文共 8123 字,大约阅读时间需要 27 分钟。

JDK7 虽然已经发布了一段时间了,但是无奈,AIO相关介绍,尤其是靠谱儿的介绍实在是太少了。兄弟花了些时间,整理成册,希望对learner有些帮助。

 

epoll成为Linux下开发大并发web 服务器的首选已经好多年了,java世界里,直到JDK 7的AIO出现才用上了这个feature。哎!不过亡羊补牢,为时未晚,下面就看下用AIO开发一个简单的TCP Server和TCP Client。

 

1. 代码结构如下一共由6个文件组成

2. Demo测试效果:

3. TCP Server 由三个文件组成。

   AioTcpServer是主文件

   AioAcceptHandler负责接收连接,采用递归模型

   AioReadHandler负责接收客户端数据,仍然是异步方式

3.1   AioTcpServer.java

package server;

import java.net.InetSocketAddress; 

import java.nio.channels.AsynchronousChannelGroup
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
 
public class AioTcpServer implements Runnable { 
    private AsynchronousChannelGroup asyncChannelGroup;  
    private AsynchronousServerSocketChannel listener;  
  
    public AioTcpServer(int port) throws Exception { 
        ExecutorService executor = Executors.newFixedThreadPool(20); 
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor); 
        listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress(port)); 
    } 
 
    public void run() { 
        try { 
         
         AioAcceptHandler acceptHandler = new AioAcceptHandler();
            listener.accept(listener, new AioAcceptHandler());  
         Thread.sleep(400000); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } finally { 
         System.out.println("finished server");
        } 
    } 
 
    public static void main(String... args) throws Exception { 
        AioTcpServer server = new AioTcpServer(9008); 
        new Thread(server).start(); 
    } 
}

 

3.2   AioAcceptHandler.java

 

package server;

import java.io.IOException; 

import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
 
public class AioAcceptHandler implements CompletionHandler { 
    public void cancelled(AsynchronousServerSocketChannel attachment) { 
        System.out.println("cancelled"); 
    } 
 
    public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel attachment) { 
        try { 
         System.out.println("AioAcceptHandler.completed called");
            attachment.accept(attachment, this); 
            System.out.println("有客户端连接:" + socket.getRemoteAddress().toString()); 
            startRead(socket); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
 
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { 
        exc.printStackTrace(); 
    } 
 
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); 
        try { 
            
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    } 
}

 

3.3  AioReadHandler.java

 

package server;

import java.io.IOException; 

import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
 
public class AioReadHandler implements CompletionHandler { 
    private AsynchronousSocketChannel socket; 
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 
 
    public void cancelled(ByteBuffer attachment) { 
        System.out.println("cancelled"); 
    } 
 
    private CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); 
 
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("客户端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    public void failed(Throwable exc, ByteBuffer buf) { 
        System.out.println(exc); 
    } 
}

 

4. TCP Client同样由三个文件组成。

   AioTcpClient  主文件

   AioConnectHandler 异步连接服务器,示例中用20个线程开启了2万个连接

   AioReadHandler 接收服务器数据,与TCP Server的一样。

 

 

4.1 AioTcpClient.java

package client;

import java.io.IOException;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import server.AioReadHandler;

public class AioTcpClient {

 private AsynchronousChannelGroup asyncChannelGroup;

 public AioTcpClient() throws Exception {

  ExecutorService executor = Executors.newFixedThreadPool(20);
  asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
 }

 private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder();

 public void start(final String ip, final int port) throws Exception {

  // 启动20000个并发连接,使用20个线程的池子
  for (int i = 0; i < 20000; i++) {
   try {
    AsynchronousSocketChannel connector = null;
    if (connector == null || !connector.isOpen()) {
     connector = AsynchronousSocketChannel
       .open(asyncChannelGroup);
      connector.setOption(StandardSocketOptions.TCP_NODELAY,
      true);
      connector.setOption(StandardSocketOptions.SO_REUSEADDR,
      true);
      connector.setOption(StandardSocketOptions.SO_KEEPALIVE,
      true);
     connector.connect(new InetSocketAddress(ip, port), connector, new AioConnectHandler(i));
    }
   } catch (IOException e) {
    e.printStackTrace();
   }

  }

 }

 

 public static void main(String... args) throws Exception {

  AioTcpClient client = new AioTcpClient();
  client.start("localhost", 9008);
 }
}

4.2 AioConnectHandler.java

 

package client;

import java.io.IOException;

import java.util.concurrent.*;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AioConnectHandler implements CompletionHandler{

 private Integer content = 0;
 public AioConnectHandler(Integer value){
  this.content = value;
 }
 
    public void completed(Void attachment,AsynchronousSocketChannel connector) { 
        try {  
         connector.write(ByteBuffer.wrap(String.valueOf(content).getBytes())).get();
         startRead(connector); 
        } catch (ExecutionException e) { 
            e.printStackTrace(); 
        } catch (InterruptedException ep) { 
            ep.printStackTrace(); 
        } 
    } 
 
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) { 
        exc.printStackTrace(); 
    } 
 
    public void startRead(AsynchronousSocketChannel socket) { 
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024); 
        socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket)); 
        try { 
            
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    }
}

4.3 AioReadHandler.java

 

package client;

import java.io.IOException; 

import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.nio.charset.CharacterCodingException
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
 
public class AioReadHandler implements CompletionHandler { 
    private AsynchronousSocketChannel socket; 
 
    public AioReadHandler(AsynchronousSocketChannel socket) { 
        this.socket = socket; 
    } 
 
    public void cancelled(ByteBuffer attachment) { 
        System.out.println("cancelled"); 
    } 
 
    private CharsetDecoder decoder = Charset.forName("GBK").newDecoder(); 
 
    public void completed(Integer i, ByteBuffer buf) { 
        if (i > 0) { 
            buf.flip(); 
            try { 
                System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf)); 
                buf.compact(); 
            } catch (CharacterCodingException e) { 
                e.printStackTrace(); 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
            socket.read(buf, buf, this); 
        } else if (i == -1) { 
            try { 
                System.out.println("对端断线:" + socket.getRemoteAddress().toString()); 
                buf = null; 
            } catch (IOException e) { 
                e.printStackTrace(); 
            } 
        } 
    } 
 
    public void failed(Throwable exc, ByteBuffer buf) { 
        System.out.println(exc); 
    } 
}

转载地址:http://eozob.baihongyu.com/

你可能感兴趣的文章
Top100案例参会总结
查看>>
Redis源码学习感悟
查看>>
Redis内存节省策略
查看>>
实测win8下安装使用QT4.8+qt creator2.8.0
查看>>
整理:深度学习 vs 机器学习 vs 模式识别
查看>>
深度学习 vs. 概率图模型 vs. 逻辑学
查看>>
IDL box plot
查看>>
IDL vector filed plot
查看>>
piecewise constant function 阶跃常函数
查看>>
IDL save postscript file
查看>>
Bibtex如何使authors in the citation 最多显示两个
查看>>
Bibtex 如何cite 不同格式
查看>>
Cmake environmental variables: how to make find_package, find_path and find_library work
查看>>
Cmake space in path windows
查看>>
Differences between Tesla and a GeForce Series GPU
查看>>
Faster Parallel Reductions on Kepler
查看>>
NVIDIA Tesla C2075 vs Tesla K10 theoretical performance
查看>>
Fast floor/ceiling functions C
查看>>
Continue Long Statements on Multiple Lines Matlab
查看>>
What does “warning: not all control paths return a value” mean? (C++)
查看>>