Java NIO多线程服务器实现

模型原理图

BOSS线程(ServerSocketChannel)专门负责建立链接,然后将accept到的SocketChannel分发给多个Worker线程。Worker线程有多个,可以分摊来自多个Client的SocketChannel。Worker线程专门负责read和write。

NIO多线程服务器实现

server端实现
package niomultithreadserver;   import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger;   public class MultiThreadServer {     public static void main(String[] args) throws IOException {         // 设置boss线程的名称为boss         Thread.currentThread().setName(boss);         // 创建ServerSocketChannel         ServerSocketChannel ssc = ServerSocketChannel.open();         /**          * 1. ServerSocketChannel配置为非阻塞模式          * 2. selector通常都是和非阻塞channel进行搭配          * 非阻塞channel一旦感兴趣的事件,则可以通过selector.select()方法将事件追加到selector的selectedKeys中          */         ssc.configureBlocking(false);         // 创建boss线程的selector,该selector专门监听accept事件         Selector boss = Selector.open();         // 将创建好的ServerSocketChannel和关注的accept事件注册到该selector         SelectionKey bossKey = ssc.register(boss, 0, null);         bossKey.interestOps(SelectionKey.OP_ACCEPT);         // ServerSocketChannel和端口进行绑定         ssc.bind(new InetSocketAddress(8088));         // 创建一定数量的worker         int cpuNum = Runtime.getRuntime().availableProcessors();         System.out.println(cpuNum: + cpuNum);         Worker[] workers = new Worker[cpuNum];         for (int i = 0; i < workers.length; i++) {             workers[i] = new Worker(worker- + i);             workers[i].initWorker();         }          AtomicInteger count = new AtomicInteger();         while (true) {             /**              * 轮训check查询的时候是否就绪,如果未就绪则select不会返回,只有监听的事件发生select()方法才返回              * 每个selector中有两个集合:              * 集合1: interested keys集合,存储的是当前selector感兴趣的channel + 事件类型(accept, connect, read, write)              * 集合2:selected keys集合,存储的是selector.select()方法调用之后扫描到的新发生的事件              * 注意每次select()方法调用时,可以理解为是往selected keys集合中追加本次select()新扫描的事件,上次select()并且加入的事件不会自动清除。              * 因此后面迭代器迭代selected keys集合时需要及时remove掉,否则下次循环会重复消费处理              */             boss.select();             Iterator<SelectionKey> bossSelectedKeysIter = boss.selectedKeys().iterator();             while (bossSelectedKeysIter.hasNext()) {                 SelectionKey key = bossSelectedKeysIter.next();                 /**                  * 注意及时remove掉已经监听到并且马上就要处理的事件                  * 因为selector的selectedKeys集合不会自动将key清除掉,这会导致下次循环重复处理                  */                 bossSelectedKeysIter.remove();                 if (key.isAcceptable()) {                     SelectableChannel keyChannel = key.channel();                     ServerSocketChannel sscFromSelector = (ServerSocketChannel) keyChannel;                     System.out.println(sscFromSelector == ssc? + (sscFromSelector == ssc));                      SocketChannel sc = sscFromSelector.accept();                     sc.configureBlocking(false);                     // 建立链接打印日志                     System.out.println(线程: + Thread.currentThread().getName() + : + connected ---  + sc.getRemoteAddress());                     // 分配worker                     System.out.println(线程: + Thread.currentThread().getName() + : + before register ---  + sc.getRemoteAddress());                     workers[count.incrementAndGet() % workers.length].registerChannel(sc);                     System.out.println(线程: + Thread.currentThread().getName() + : + after register ---  + sc.getRemoteAddress());                 }             }         }     }      static class Worker implements Runnable {         private Thread thread;         private Selector workerSelector;         private String name;         ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();          public Worker(String name) {             this.name = name;         }          public void initWorker() throws IOException {             thread = new Thread(this, name);             workerSelector = Selector.open();             thread.start();         }          /**          * 将一个SocketChannel分配给当前worker          *          * @param sc          */         public void registerChannel(SocketChannel sc) {             queue.add(() -> {                 try {                     sc.register(workerSelector, SelectionKey.OP_READ, null);                 } catch (ClosedChannelException e) {                     e.printStackTrace();                 }             });             // 注册到任务队列之后唤醒一下             workerSelector.wakeup();         }          @Override         public void run() {             while (true) {                 try {                     workerSelector.select();                     /**                      * 注册分配的channel                      */                     Runnable scRegisterTask = queue.poll();                     if (scRegisterTask != null) {                         scRegisterTask.run();                     }                      Iterator<SelectionKey> workerSelectedKeysIter = workerSelector.selectedKeys().iterator();                     while (workerSelectedKeysIter.hasNext()) {                         SelectionKey key = workerSelectedKeysIter.next();                         workerSelectedKeysIter.remove();                         if (key.isReadable()) {                             SocketChannel channel = (SocketChannel) key.channel();                             ByteBuffer byteBuffer = ByteBuffer.allocate(16);                             /**                              * 将channel中的数据写入buffer                              * 注意:两种情况需要做好处理:                              * 1. 如果客户端正常关闭了socket,则read返回的是-1,这时需要将客户端对应的channel从boss selector中cancel                              * 2. 如果客户端非正常关闭了socket,则需要捕获read方法,并将客户端对应的channel从boss selector中cancel                              */                             int readCnt = channel.read(byteBuffer);                             if (readCnt == -1) {                                 System.out.println(线程: + Thread.currentThread().getName() + : + 客户端关闭链接,取消channel监听...);                                 key.cancel();                             }                             System.out.println(线程: + Thread.currentThread().getName() + : + read data ---  + channel.getRemoteAddress());                             // 切换buffer为读模式                             byteBuffer.flip();                             // 读取                             System.out.println(Charset.forName(UTF-8).decode(byteBuffer));                         }                     }                 } catch (IOException e) {                     e.printStackTrace();                 }             }         }     } }  
client端实现
package niomultithreadserver;  import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset;  public class Client {     public static void main(String[] args) throws IOException {         SocketChannel sc = SocketChannel.open();         sc.connect(new InetSocketAddress(localhost, 8088));         sc.write(Charset.forName(UTF-8).encode(123abc));         System.in.read();     } }