关于NIO,我们在上一篇 linux下网络编程socket&select&epoll的底层实现原理 就介绍了网络阻塞IO、以及基于事件驱动的非阻塞IO。对于NIO的API基本使用是java提供的接口,然后我们在业务上对NIO的使用,也是有不同的使用方法的。然后在我们的网络应用服务器的开发对NIO的使用,一般是基于Reactor模型,Reactor模型主要有3种:单Reactor单线程模型、单Reactor多线程模型、多Reactor多线程模型。下面我们就具体梳理下这3种模型,本篇主要是基于<>进行对应整理。
一、网络服务
对于一般的网络服务、分布式应用,都有一些基本的结构流程。也就是说,我们可以将网络服务分为这5种具体的处理步骤。Reactor模型主要就是看怎么基于NIO的API,充分利用多CPU多线程来合理加快各个环节的处理效率。
1、读取请求数据
2、对请求数据进行解码
3、对数据进行处理
4、对回复数据进行编码
5、发送回复
二、阻塞IO经典的网络服务器设计
然后在网络服务的处理中,最基本的类型就是这种,对于每个请求,新加一个线程进行处理。
在这种模型中,主要是阻塞的,当一个连接没有断,就会一直占用到线程。这个我们上一篇也有提到这个
在这种模型种,主要就是新接受到一个Socket就要新加一个线程进行处理,一直到这个socket释放,就即使这个socket没有进行操作也要一直阻塞占用这个线程
java">package org.example.reactor;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
class ClassicServer{
public static void main(String[] args) {
ClassicServer classicServer = new ClassicServer();
classicServer.start();
}
public static final int PORT = 9999;
public void start() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted()) {
new Thread(new Handler(ss.accept())).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
InputStream inputStream = socket.getInputStream();
byte[] read = read(inputStream);
String decode = decode(read);
String compute = compute(decode);
byte[] encode = encode(compute);
send(socket, encode);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
private byte[] read(InputStream inputStream) throws IOException {
byte[] input = new byte[1024];
int read = inputStream.read(input);
if (read > 0){
byte[] returnBytes = new byte[read];
System.arraycopy(input,0,returnBytes,0,read);
return returnBytes;
}
return new byte[0];
}
private String decode(byte[] bytes){
return new String(bytes);
}
private String compute(String msg){
System.out.println("print server msg ---- " + msg);
return "Hello Client, I am Server";
}
private byte[] encode(String returnMsg){
return returnMsg.getBytes(StandardCharsets.UTF_8);
}
private void send(Socket socket, byte[] returnBytes){
try {
socket.getOutputStream().write(returnBytes);
socket.getOutputStream().flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
三、Reactor模型
然后对应Reactor模型,其使用NIO,然后进行拆分的概念。例如可以将accept提出来,然后对于应用来说,主要耗时的是在与业务相关,也可以将compute拿出来放到线程池来处理。
基于NIO,其是基于事件驱动,IO多路复用。这个我们上一篇也有提到这个:
这种的话,因为是select轮询,不用一直accept、read阻塞,一个线程就能管理多个SocketChannel。
1、单线程单Reactor模型
1)、基本介绍
可以看到,在单线程单Reactor模型中,将accept交给了Acceptor来处理,同时对于事件进行dispatch分发。
下面我们看下这种模型的具体实现:
2)、demo实现案例
对于Server端:
java">package org.example.reactor.single;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
class SingleThreadSingleReactor{
private final Selector selector;
private final ServerSocketChannel serverSocket;
SingleThreadSingleReactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws IOException {
SingleThreadSingleReactor singleThreadSingleReactor = new SingleThreadSingleReactor(6666);
singleThreadSingleReactor.dispatcher();
}
public void dispatcher() throws IOException {
System.out.println("server start dispatcher-----------");
Acceptor acceptor = new Acceptor();
Handler handler = new Handler();
while (true){
int select = selector.select(200);
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()){
System.out.println("client acceptor-----------");
acceptor.acceptor();
}else if (selectionKey.isReadable()){
System.out.println("server read msg ---------");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
byte[] read = read(socketChannel);
if (read.length > 0){
byte[] handlerMsg = handler.handler(read);
if (handlerMsg != null){
send(socketChannel,handlerMsg);
}
}
}
keyIterator.remove();
}
}
}
}
private byte[] read(SocketChannel socket) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socket.read(byteBuffer);
if (read > 0){
byte[] returnBytes = new byte[read];
System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
return returnBytes;
}
return new byte[0];
}
private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
socketChannel.write(ByteBuffer.wrap(bytes));
}
// Acceptor 连接处理类
class Acceptor {
public void acceptor() {
try {
SocketChannel c = serverSocket.accept();
c.configureBlocking(false);
c.register(selector,SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public class Handler {
public byte[] handler(byte[] bytes) {
//按业务含义去解析获取数据
String decode = decode(bytes);
//业务处理
String compute = compute(decode);
//业务处理后去进行返回编码
byte[] encode = encode(compute);
return encode;
}
private String decode(byte[] bytes){
return new String(bytes);
}
private String compute(String msg){
System.out.println("msg compute handler start------------");
System.out.println("print client msg ---- " + msg);
System.out.println("msg compute handler end------------");
return "server compute msg return";
}
private byte[] encode(String returnMsg){
return returnMsg.getBytes(StandardCharsets.UTF_8);
}
}
}
对于Client端:
java">package org.example.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class NioClientMain {
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
Thread.sleep(1000);
new Thread(() -> {
try {
multiThread();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
private static void multiThread() throws IOException, InterruptedException {
Selector selector = Selector.open();
SocketChannel clientSocketChannel = SocketChannel.open();
clientSocketChannel.configureBlocking(false);
clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",6666));
clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true){
int select = selector.select(200);
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()){
socketChannel.finishConnect();
System.out.println("server is connect-------");
socketChannel.register(selector,SelectionKey.OP_WRITE);
}else if (selectionKey.isWritable()){
Thread.sleep(3000);
System.out.println("client send msg-------" + Thread.currentThread().getName() );
String sendMsg = Thread.currentThread().getName() + ": Hello Server";
ByteBuffer byteBuffer = ByteBuffer.wrap(sendMsg.getBytes(StandardCharsets.UTF_8));
socketChannel.write(byteBuffer);
socketChannel.register(selector,SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0){
byte[] bytes = new byte[read];
byteBuffer.flip();
byteBuffer.get(bytes);
System.out.println(Thread.currentThread().getName() + " current client ip " + socketChannel.getLocalAddress());
System.out.println(Thread.currentThread().getName() + " server msg: "+ new String(bytes));
socketChannel.register(selector,SelectionKey.OP_WRITE);
}
}
keyIterator.remove();
}
}
}
}
}
运行结果
server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-3
client send msg-------Thread-2
client send msg-------Thread-1
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:7623
Thread-0 server msg: server compute msg return
Thread-1 current client ip /127.0.0.1:7621
Thread-1 server msg: server compute msg return
Thread-3 current client ip /127.0.0.1:7622
Thread-3 server msg: server compute msg return
Thread-2 current client ip /127.0.0.1:7620
Thread-2 server msg: server compute msg return
2、多线程单Reactor模型
1)、基本介绍
在上面的单线程单Reactor模型中,我们可以看到,其的业务是单线程的,前一个的处理会阻塞后面的处理。下面我们来看下多线程的版本。
在这种模型中,其将业务处理(decode、compute、encode)拿出来交给workerThreads线程池来处理了。这样的话,前一个业务处理就不会影响到后一个的逻辑处理了。
2)、demo实现案例
server端:
java">package org.example.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MultiThreadSingleReactor {
private final Selector selector;
private final ServerSocketChannel serverSocket;
private ExecutorService executorService;
MultiThreadSingleReactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
executorService = Executors.newFixedThreadPool(4);
}
public static void main(String[] args) throws IOException {
MultiThreadSingleReactor singleThreadSingleReactor = new MultiThreadSingleReactor(6666);
singleThreadSingleReactor.dispatcher();
}
public void dispatcher() throws IOException {
System.out.println("server start dispatcher-----------");
Acceptor acceptor = new Acceptor();
Handler handler = new Handler();
while (true){
int select = selector.select(200);
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()){
System.out.println("client acceptor-----------");
acceptor.acceptor();
}else if (selectionKey.isReadable()){
System.out.println("server read msg ---------");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//dispatcher 读数据
byte[] read = read(socketChannel);
if (read.length > 0){
//线程池进行 decode、computer、encode处理
executorService.submit(() -> {
try {
handler.handler(socketChannel,read);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
}
}else if (selectionKey.isWritable()){
byte[] bytes = (byte[]) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (Objects.nonNull(bytes)){
//dispatcher 写数据
send(socketChannel,bytes);
}
}
keyIterator.remove();
}
}
}
}
private byte[] read(SocketChannel socket) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socket.read(byteBuffer);
if (read > 0){
byte[] returnBytes = new byte[read];
System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
return returnBytes;
}
return new byte[0];
}
private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
socketChannel.write(ByteBuffer.wrap(bytes));
socketChannel.register(selector,SelectionKey.OP_READ);
}
// Acceptor 连接处理类
class Acceptor {
public void acceptor() {
try {
SocketChannel c = serverSocket.accept();
c.configureBlocking(false);
c.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public class Handler {
public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {
//按业务含义去解析获取数据
String decode = decode(bytes);
//业务处理
String compute = compute(decode);
//业务处理后去进行返回编码
byte[] encode = encode(compute);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
//需要发送的数据
selectionKey.attach(encode);
}
private String decode(byte[] bytes){
return new String(bytes);
}
private String compute(String msg){
System.out.println("msg compute handler start------------");
System.out.println("print client msg ---- " + msg);
System.out.println("msg compute handler end------------");
return "server compute msg return";
}
private byte[] encode(String returnMsg){
return returnMsg.getBytes(StandardCharsets.UTF_8);
}
}
}
Client的话,用上一个Client就可以了。
运行结果
server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------
server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:9953
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:9956
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:9959
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:9962
Thread-3 server msg: server compute msg return
3、多线程多Reactor模型
1)、基本介绍
在上面的多线程单Reactor模型中虽然进行业务处理部分用多线程解决了阻塞问题。但在通过Selector类型事件的时候,还是用的一个Selector,就一个Selector管理全部SocketChannel这个在连接多的时候,这部分就会有瓶颈,同时的话,读、写也是在一个Reactor中。
所以我们可以再优化,就进行多Reactor,一个主Reactor只进行accept,accept后,就再将这个SocketChannel交给子Reactor,同时子Reactor可以设置多个,这就将事件轮询、数据读取、业务处理、数据发送分散到多个Reactor了,更好的应用多线程充分利用CPU了。其具体的模型就是这样了:
2)、demo实现案例
服务端:
java">package org.example.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class SingleThreadMainReactor {
private final Selector selector;
private final ServerSocketChannel serverSocket;
private final MultiThreadSubReactor[] multiThreadSubReactors;
public static final int mulSubNum = 4;
private int nextSubIndex = 0;
SingleThreadMainReactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
multiThreadSubReactors = new MultiThreadSubReactor[mulSubNum];
for (int i = 0; i < mulSubNum; i++) {
multiThreadSubReactors[i] = new MultiThreadSubReactor();
new Thread(multiThreadSubReactors[i]).start();
}
}
public static void main(String[] args) throws IOException {
SingleThreadMainReactor singleThreadSingleReactor = new SingleThreadMainReactor(6666);
singleThreadSingleReactor.accept();
}
public void accept() throws IOException {
System.out.println("server start accept-----------");
Acceptor acceptor = new Acceptor();
while (true){
int select = selector.select(200);
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isAcceptable()){
System.out.println("client acceptor-----------");
acceptor.acceptor();
}
keyIterator.remove();
}
}
}
}
// Acceptor 连接处理类
class Acceptor {
public void acceptor() {
try {
SocketChannel c = serverSocket.accept();
c.configureBlocking(false);
nextSubIndex = nextSubIndex%mulSubNum;
System.out.println("server subReactor register--" + nextSubIndex);
multiThreadSubReactors[nextSubIndex].register(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
java">package org.example.reactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MultiThreadSubReactor implements Runnable{
private final Selector selector;
private ExecutorService executorService;
MultiThreadSubReactor() throws IOException {
selector = Selector.open();
executorService = Executors.newFixedThreadPool(4);
}
@Override
public void run() {
System.out.println("MultiThreadSubReactor start-----------");
Handler handler = new Handler();
while (true){
try {
int select = selector.select(200);
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if (selectionKey.isReadable()){
System.out.println("subReactor read msg ---------");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//dispatcher 读数据
byte[] read = read(socketChannel);
if (read.length > 0){
//线程池进行 decode、computer、encode处理
executorService.submit(() -> {
try {
handler.handler(socketChannel,read);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
}
}else if (selectionKey.isWritable()){
byte[] bytes = (byte[]) selectionKey.attachment();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (Objects.nonNull(bytes)){
//dispatcher 写数据
send(socketChannel,bytes);
}
}
keyIterator.remove();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private byte[] read(SocketChannel socket) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socket.read(byteBuffer);
if (read > 0){
byte[] returnBytes = new byte[read];
System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
return returnBytes;
}
return new byte[0];
}
private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
socketChannel.write(ByteBuffer.wrap(bytes));
socketChannel.register(selector,SelectionKey.OP_READ);
}
public void register(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(selector,SelectionKey.OP_READ);
}
public class Handler {
public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {
//按业务含义去解析获取数据
String decode = decode(bytes);
//业务处理
String compute = compute(decode);
//业务处理后去进行返回编码
byte[] encode = encode(compute);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
//需要发送的数据
selectionKey.attach(encode);
}
private String decode(byte[] bytes){
return new String(bytes);
}
private String compute(String msg){
System.out.println("msg compute handler start------------");
System.out.println("print client msg ---- " + msg);
System.out.println("msg compute handler end------------");
return "server compute msg return";
}
private byte[] encode(String returnMsg){
return returnMsg.getBytes(StandardCharsets.UTF_8);
}
}
}
Client同样使用原来的
运行结果:
MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
server start accept-----------
MultiThreadSubReactor start-----------
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:1989
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:1992
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:1995
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:1998
Thread-3 server msg: server compute msg return