Java NIO多线程服务器实现
模型原理图
BOSS线程(ServerSocketChannel)专门负责建立链接,然后将accept到的SocketChannel分发给多个Worker线程。Worker线程有多个,可以分摊来自多个Client的SocketChannel。Worker线程专门负责read和write。
NIO多线程服务器实现
server端实现
package niomultithreadserver; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; public class MultiThreadServer { public static void main(String[] args) throws IOException { // 设置boss线程的名称为boss Thread.currentThread().setName(boss); // 创建ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); /** * 1. ServerSocketChannel配置为非阻塞模式 * 2. selector通常都是和非阻塞channel进行搭配 * 非阻塞channel一旦感兴趣的事件,则可以通过selector.select()方法将事件追加到selector的selectedKeys中 */ ssc.configureBlocking(false); // 创建boss线程的selector,该selector专门监听accept事件 Selector boss = Selector.open(); // 将创建好的ServerSocketChannel和关注的accept事件注册到该selector SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); // ServerSocketChannel和端口进行绑定 ssc.bind(new InetSocketAddress(8088)); // 创建一定数量的worker int cpuNum = Runtime.getRuntime().availableProcessors(); System.out.println(cpuNum: + cpuNum); Worker[] workers = new Worker[cpuNum]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker(worker- + i); workers[i].initWorker(); } AtomicInteger count = new AtomicInteger(); while (true) { /** * 轮训check查询的时候是否就绪,如果未就绪则select不会返回,只有监听的事件发生select()方法才返回 * 每个selector中有两个集合: * 集合1: interested keys集合,存储的是当前selector感兴趣的channel + 事件类型(accept, connect, read, write) * 集合2:selected keys集合,存储的是selector.select()方法调用之后扫描到的新发生的事件 * 注意每次select()方法调用时,可以理解为是往selected keys集合中追加本次select()新扫描的事件,上次select()并且加入的事件不会自动清除。 * 因此后面迭代器迭代selected keys集合时需要及时remove掉,否则下次循环会重复消费处理 */ boss.select(); Iterator<SelectionKey> bossSelectedKeysIter = boss.selectedKeys().iterator(); while (bossSelectedKeysIter.hasNext()) { SelectionKey key = bossSelectedKeysIter.next(); /** * 注意及时remove掉已经监听到并且马上就要处理的事件 * 因为selector的selectedKeys集合不会自动将key清除掉,这会导致下次循环重复处理 */ bossSelectedKeysIter.remove(); if (key.isAcceptable()) { SelectableChannel keyChannel = key.channel(); ServerSocketChannel sscFromSelector = (ServerSocketChannel) keyChannel; System.out.println(sscFromSelector == ssc? + (sscFromSelector == ssc)); SocketChannel sc = sscFromSelector.accept(); sc.configureBlocking(false); // 建立链接打印日志 System.out.println(线程: + Thread.currentThread().getName() + : + connected --- + sc.getRemoteAddress()); // 分配worker System.out.println(线程: + Thread.currentThread().getName() + : + before register --- + sc.getRemoteAddress()); workers[count.incrementAndGet() % workers.length].registerChannel(sc); System.out.println(线程: + Thread.currentThread().getName() + : + after register --- + sc.getRemoteAddress()); } } } } static class Worker implements Runnable { private Thread thread; private Selector workerSelector; private String name; ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public Worker(String name) { this.name = name; } public void initWorker() throws IOException { thread = new Thread(this, name); workerSelector = Selector.open(); thread.start(); } /** * 将一个SocketChannel分配给当前worker * * @param sc */ public void registerChannel(SocketChannel sc) { queue.add(() -> { try { sc.register(workerSelector, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); // 注册到任务队列之后唤醒一下 workerSelector.wakeup(); } @Override public void run() { while (true) { try { workerSelector.select(); /** * 注册分配的channel */ Runnable scRegisterTask = queue.poll(); if (scRegisterTask != null) { scRegisterTask.run(); } Iterator<SelectionKey> workerSelectedKeysIter = workerSelector.selectedKeys().iterator(); while (workerSelectedKeysIter.hasNext()) { SelectionKey key = workerSelectedKeysIter.next(); workerSelectedKeysIter.remove(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(16); /** * 将channel中的数据写入buffer * 注意:两种情况需要做好处理: * 1. 如果客户端正常关闭了socket,则read返回的是-1,这时需要将客户端对应的channel从boss selector中cancel * 2. 如果客户端非正常关闭了socket,则需要捕获read方法,并将客户端对应的channel从boss selector中cancel */ int readCnt = channel.read(byteBuffer); if (readCnt == -1) { System.out.println(线程: + Thread.currentThread().getName() + : + 客户端关闭链接,取消channel监听...); key.cancel(); } System.out.println(线程: + Thread.currentThread().getName() + : + read data --- + channel.getRemoteAddress()); // 切换buffer为读模式 byteBuffer.flip(); // 读取 System.out.println(Charset.forName(UTF-8).decode(byteBuffer)); } } } catch (IOException e) { e.printStackTrace(); } } } } }
client端实现
package niomultithreadserver; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress(localhost, 8088)); sc.write(Charset.forName(UTF-8).encode(123abc)); System.in.read(); } }