首页 >> 大全

java网络编程模型之BIO、NIO、AIO

2023-12-14 大全 24 作者:考证青年

AIO模型采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,再主动通知应用程序,触发相应的函数。流程图如下

AIO框架简析

下面的图片展示了AIO模型中各个类与接口之间的关系

基本执行流程

首先来介绍一下aio模型的基于客户端的基本执行流程

通过roup线程组创建一个对象,用于监听和接受客户端的连接请求。调用的bind()方法,将其绑定到指定的IP地址和端口号。调用的()方法,开始监听客户端的连接请求。一旦监听到客户端的连接请求,在()方法中传入实例,将服务端的通道初始化。在通道初始化时会创建一个实例作为参数传入,继承了(通用消息处理器),在类中定义了三个抽象类,分别是(通道激活前)、(通道断开前)、(读取消息抽象类前),在中对上述方法进行重写,在这些方法可以增加相应的业务逻辑。中的方法会被触发,该方法是连接建立后进行异步读取数据的回调函数,在该回调函数中首先会读取数据进行判断是否关闭通道(在关闭通道之前会调用),然后利用缓冲流进行数据读取(),然后调用异步通道对象的read方法注册下一次的异步读取事件,进行循环读取。由于该模型的通信方式是全双工通信模式,因此客户端和服务端可以同时互相发送消息,服务端要进行消息发送时会通过中的方法进行消息推送。 代码实现

客户端

package com.kjz.NettyDemo.Aio.client;import java.nio.channels.AsynchronousSocketChannel; // 异步Socket通道
import java.net.InetSocketAddress; // InetSocketAddress类用于封装IP地址和端口号
import java.nio.ByteBuffer; // ByteBuffer类用于读写操作
import java.nio.charset.Charset; // Charset类用于指定字符编码方式
import java.util.concurrent.Future; // Future接口用于获取异步操作的结果
public class AioClient {public static void main(String[] args) throws Exception {// 创建异步Socket通道AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();// 建立连接并返回Future对象Future future = socketChannel.connect(new InetSocketAddress("192.168.1.116", 7397));// 打印启动提示信息System.out.println("kjz-demo-netty client start done.");// 等待连接完成future.get();// 通过通道进行数据读取操作//参数一:一个容量为1024的字节缓冲区对象,用于存储服务器发送的数据。//参数二:是一个附件对象,可以在异步操作完成后传递给回调函数。//参数三:回调函数,用于在异步读取完成后处理读取的结果(消息处理器)socketChannel.read(ByteBuffer.allocate(1024),  null,new AioClientHandler(socketChannel, Charset.forName("GBK")));// 睡眠100秒,保持客户端运行//通过调用 Thread.sleep 方法可以暂停当前线程的执行,确保客户端的事件循环(Event Loop)持续执行,// 从而保持与服务器的通信。Thread.sleep(100000);}}

客户端消息处理器

package com.kjz.NettyDemo.Aio.client;import com.kjz.NettyDemo.Aio.ChannelAdapter;
import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
//客户端消息处理器
public class AioClientHandler extends ChannelAdapter {public AioClientHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}//channelActive方法在AIO客户端与服务器建立连接后被调用,输出连接信息。@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程服务端:" +//ctx通道处理器,从通道处理器中获取当前连接的异步通道对象ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功} catch (IOException e) {e.printStackTrace();}}//断开连接@Overridepublic void channelInactive(ChannelHandler ctx) {}/*channelRead方法在AIO客户端接收到服务器发送的消息时被调用,首先输出接收到的消息,然后通过ctx.writeAndFlush方法向服务器发送响应消息,告知服务器已经成功处理该消息。全双共通信方式*/@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "\r\n");ctx.writeAndFlush("客户端信息处理Success!\r\n");}}

服务端

package com.kjz.NettyDemo.Aio.server;import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
//服务端
public class AioServer extends  Thread{//异步服务端socket通道对象private AsynchronousServerSocketChannel serverSocketChannel;@Overridepublic void run() {try {//使用AsynchronousChannelGroup是为了管理异步通道资源,它可以将多个异步通道共享同一线程池。// 在这里,使用固定大小的线程池来处理异步请求,避免了每次请求都创建新线程的开销。serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(),10));//绑定端口serverSocketChannel.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty server start done.");//通过CountDownLatch等待客户端连接CountDownLatch latch = new CountDownLatch(1);//监听客户端连接serverSocketChannel.accept(this, new AioServerChannelInitializer());latch.await();} catch (Exception e) {e.printStackTrace();}}public AsynchronousServerSocketChannel serverSocketChannel() {return serverSocketChannel;}public static void main(String[] args) {new AioServer().start();}
}

服务端初始化通道

package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelInitializer;import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
//服务端初始化通道
public class AioServerChannelInitializer extends ChannelInitializer {@Overrideprotected void initChannel(AsynchronousSocketChannel channel) throws Exception {channel.read(ByteBuffer.allocate(1024), 10, TimeUnit.SECONDS,null, new AioServerHandler(channel, Charset.forName("GBK")));}}

服务端消息处理器

package com.kjz.NettyDemo.Aio.server;import com.kjz.NettyDemo.Aio.ChannelAdapter;
import com.kjz.NettyDemo.Aio.ChannelHandler;import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Date;public class AioServerHandler extends ChannelAdapter {public AioServerHandler(AsynchronousSocketChannel channel, Charset charset) {super(channel, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接远程客户端:" +ctx.channel().getRemoteAddress()+"成功");//通知客户端链接建立成功ctx.writeAndFlush("服务端连接建立成功" + " " + new Date() + " " +ctx.channel().getRemoteAddress() + "\r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelInactive(ChannelHandler ctx) {}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println("服务端收到:" + new Date() + " " + msg + "\r\n");ctx.writeAndFlush("服务端信息处理Success!\r\n");}}

通用消息处理器

package com.kjz.NettyDemo.Aio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
//通用消息处理器
public abstract class ChannelAdapter implements CompletionHandler {//异步socket通道private AsynchronousSocketChannel channel;//字符集private Charset charset;//构造方法public ChannelAdapter(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;//通道打开时if (channel.isOpen()) {channelActive(new ChannelHandler(channel, charset));}}//异步读取到数据后的回调函数,该方法根据异步读取结果判断是否需要关闭通道,// 然后将缓冲区中的字节数组转换为字符串,并通过channelRead方法处理读取到的消息。@Overridepublic void completed(Integer result, Object attachment) {try {//创建一个大小为1024的ByteBuffer对象,用于存储读取到的数据final ByteBuffer buffer = ByteBuffer.allocate(1024);//定义一个超时时间,单位为秒,默认为1小时。final long timeout = 60 * 60L;//调用channel.read方法进行异步读取//buffer参数是要读取的缓冲区//timeout是读取超时时间//TimeUnit.SECONDS表示超时时间的单位//attachment;附件//new CompletionHandler()返回一个新的CompletionHandler对象用于处理读取结果。channel.read(buffer, timeout, TimeUnit.SECONDS, null,//匿名类new CompletionHandler() {@Overridepublic void completed(Integer result, Object attachment) {//判断是否需要关闭通道if (result == -1) {try {//首先判断result是否等于-1,如果是则表示通道已关闭,// 需要执行关闭通道的逻辑。在关闭通道之前,会调用channelInactive方法表示通道已断开,// 并关闭通道。channelInactive(new ChannelHandler(channel, charset));channel.close();} catch (IOException e) {e.printStackTrace();}return;}//使用buffer.flip()反转缓冲区,将读取到的数据准备为读取状态。buffer.flip();//创建一个新的ChannelHandler对象,并通过channelRead方法处理读取到的消息,// 同时传入通道和字符集。channelRead(new ChannelHandler(channel, charset), charset.decode(buffer));//调用buffer.clear()清空缓冲区,以便重新写入数据。buffer.clear();//注册下一次的异步读取事件,使用之前定义的超时时间和时间单位,null表示附件,// this表示当前的CompletionHandler对象。循环读取channel.read(buffer, timeout, TimeUnit.SECONDS, null, this);}//异步读取出现异常时的回调函数,将异常信息输出到控制台。@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.getStackTrace();}//通道激活public abstract void channelActive(ChannelHandler ctx);//通道断开public abstract void channelInactive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Aio;import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
//通道处理器
public class ChannelHandler {private AsynchronousSocketChannel channel;private Charset charset;public ChannelHandler(AsynchronousSocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//用于将数据写入异步Socket通道中,并发送到远程节点public void writeAndFlush(Object msg) {//方法接收一个Object类型的参数msg,首先将其转换为字节数组byte[] bytes = msg.toString().getBytes(charset);//根据该字节数组的长度创建一个新的ByteBuffer对象writeBuffer。ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//字节数组写入缓冲区中writeBuffer.put(bytes);//将缓冲区准备为写入状态writeBuffer.flip();channel.write(writeBuffer);}public AsynchronousSocketChannel channel() {return channel;}public void setChannel(AsynchronousSocketChannel channel) {this.channel = channel;}
}

初始化处理通道的回调方法

package com.kjz.NettyDemo.Aio;import com.kjz.NettyDemo.Aio.server.AioServer;import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//初始化处理通道的回调方法
public abstract class ChannelInitializer implementsCompletionHandler {@Overridepublic void completed(AsynchronousSocketChannel channel, AioServer attachment) {try {//调用initChannel(channel)方法对通道进行初始化操作initChannel(channel);} catch (Exception e) {e.printStackTrace();} finally {//都会执行// 再此接收客户端连接,保持持续监听attachment.serverSocketChannel().accept(attachment, this);        }}@Overridepublic void failed(Throwable exc, AioServer attachment) {exc.getStackTrace();}//初始化通道方法,具体的初始化逻辑由子类实现,因此该方法被声明为抽象方法。protected abstract void initChannel(AsynchronousSocketChannel channel) throws Exception;}

BIO模型 基础补充

BIO模型是基于实现的,对象是实现网络通信的基础类之一,用于在客户端和服务器之间建立可靠的双向通信连接。下面首先先介绍一下该对象

和:

类用于客户端,可以与服务端建立连接,发送请求和接收响应。

类用于服务器端,监听指定的端口,接受客户端的连接请求,并创建相应的对象进行通信。

构造方法:

类的常用构造方法有以下几种:

( host, int port):根据主机名和端口号创建对象。

( , int port):根据IP地址和端口号创建对象。

( host, int port, , int ):根据主机名、 端口号、本地IP地址和本地端口号创建对象。

类的常用构造方法有以下几种:

(int port):创建一个绑定到指定端口号的对象。

(int port, int ):创建一个绑定到指定端口号,并指定连接请 求队列长度的对象。

(int port, int , ):创建一个绑定到指 定端口号和本地IP地址,并指定连接请求队列长度的对象。

常用方法:

类的常用方法:

():获取与关联的输入流,用于接收服务器发送的数据。

():获取与关联的输出流,用于向服务器发送数据。

():判断是否连接到远程主机。

():判断是否已关闭。

close():关闭连接。

类的常用方法:

():监听并接受客户端的连接请求,并返回一个新的对象供通信 使用。

():判断是否已绑定到指定的端口。

():判断是否已关闭。

close():关闭。

Java中的对象是网络通信的基础类,用于建立客户端与服务器之间的连接并进行数据交换。它提供了丰富的方法和功能,方便开发者在网络编程中进行数据传输和操作。

执行基本流程

从服务端来分析:

创建对象,绑定监听端口。进入循环等待客户端连接请求。在循环中调用方法接受客户端的连接请求,一旦有客户端连接成功,就创建一个新的对象来处理与该客户端的通信。BIO模型的特点是采用阻塞式I/O,即当服务器执行方法时,如果没有客户端连接到来,服务器会一直阻塞在这一步,直到有新的连接请求到达才会继续执行。创建线程处理客户端的请求和数据传输。每个客户端连接都会占用一个独立的线程来处理通信。服务器持续监听客户端的连接请求。 代码实现

客户端

package com.kjz.NettyDemo.Bio.client;import java.io.IOException;
import java.net.Socket;
import java.nio.charset.Charset;public class BioClient {public static void main(String[] args) {try {Socket socket = new Socket("192.168.1.116", 7397);System.out.println("kjz-demo-netty client start done.");BioClientHandler bioClientHandler = new BioClientHandler(socket,Charset.forName("utf-8"));bioClientHandler.start();} catch (IOException e) {e.printStackTrace();}}
}

客户端消息处理器

package com.kjz.NettyDemo.Bio.client;import com.kjz.NettyDemo.Bio.ChannelAdapter;
import com.kjz.NettyDemo.Bio.ChannelHandler;import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
//客户端消息处理器
public class BioClientHandler extends ChannelAdapter {public BioClientHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接报告LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioClient to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}
}

服务端

package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.server.BioServerHandler;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;public class BioServer extends Thread {private ServerSocket serverSocket = null;public static void main(String[] args) {BioServer bioServer = new BioServer();bioServer.start();}@Overridepublic void run() {try {serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(7397));System.out.println("kjz-demo-netty bio server start done. ");//进入循环等待客户端连接请求:while (true) {//在循环中调用accept方法接受客户端的连接请求,一旦有客户端连接成功,// 就创建一个新的Socket对象来处理与该客户端的通信。Socket socket = serverSocket.accept();//创建BioServerHandler线程处理客户端的请求和数据传输BioServerHandler handler = newBioServerHandler(socket, Charset.forName("GBK"));handler.start();}} catch (IOException e) {e.printStackTrace();}}
}

服务端消息处理器

package com.kjz.NettyDemo.Bio.server;import com.kjz.NettyDemo.Bio.ChannelAdapter;
import com.kjz.NettyDemo.Bio.ChannelHandler;import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
//服务端消息处理器
public class BioServerHandler extends ChannelAdapter {public BioServerHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("连接LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ BioServer to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}
}

通用消息处理器

package com.kjz.NettyDemo.Bio;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.Charset;
//通用消息处理器
public abstract class ChannelAdapter extends Thread {private Socket socket;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;while (!socket.isConnected()) {break;}channelHandler = new ChannelHandler(this.socket, charset);channelActive(channelHandler);}@Overridepublic void run() {try {BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));String str = null;while ((str = input.readLine()) != null) {channelRead(channelHandler, str);}} catch (IOException e) {e.printStackTrace();}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Bio;import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;
//通道处理器
public  class ChannelHandler {private Socket socket;private Charset charset;public ChannelHandler(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;}//消息写入public void writeAndFlush(Object msg) {OutputStream out = null;try {out = socket.getOutputStream();out.write((msg.toString()).getBytes(charset));out.flush();} catch (IOException e) {e.printStackTrace();}}public Socket socket() {return socket;}
}

NIO模型 基础补充

是Java NIO中的一个重要组件,用于实现非阻塞IO操作。它可以通过一个线程同时监听多个的IO事件,从而实现高效的IO多路复用。

的主要作用是管理多个,并监听这些上的IO事件。它可以通过调用()方法阻塞等待就绪的IO事件,然后返回就绪的IO事件的数量。通过()方法可以获取到已经就绪的IO事件的集合,然后可以遍历这个集合进行相应的处理。

的常用方法包括:

java程序设计之网络编程_javanio网络编程_

open():创建一个新的对象。

close():关闭对象。

():阻塞等待就绪的IO事件,返回就绪的IO事件的数量。

(long ):阻塞等待就绪的IO事件,最多等待毫秒,返回就绪的 IO事件的数量。

():非阻塞立即返回就绪的IO事件的数量。

():唤醒阻塞在()方法上的线程。

keys():返回当前注册在上的所有的。

():返回已经就绪的IO事件的集合。

在使用时,需要将注册到上,并指定感兴趣的IO事件,如读、写、连接、接收等事件。通过可以获取到注册的以及感兴趣的IO事件。

执行基本流程

从服务端的角度进行分析:

启动服务器时,创建一个实例,并打开一个通道,并对进行响应配置。

使用绑定指定的端口号,并设置最大连接数(.().bind(new (port), 1024))。

将注册到上,设置关注的事件为.,表示对客户端连接事件感兴趣(.(, .))。

创建一个实例,并将和字符集传递给它。

开始进入事件循环(while(true)),不断轮询上发生的事件。

调用的()方法,阻塞等待事件发生。

当某个事件发生时,()方法返回,并返回一组发生事件的集合。

遍历处理每个,判断其对应的事件类型。

如果是事件,表示有客户端连接请求,调用的()方法处理连接请求。

如果是事件,表示有数据可读,调用的()方法处理读取的数据。

在()和()方法中,可以通过的()方法将要发送的数据写入通道。

继续循环执行步骤6-12,处理下一个事件。

代码实现

客户端

package com.kjz.NettyDemo.Nio.client;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class NioClient {public static void main(String[] args) throws IOException {//创建Selector对象Selector selector = Selector.open();//创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();//配置为非阻塞模式socketChannel.configureBlocking(false);//尝试建立连接boolean isConnect = socketChannel.connect(new InetSocketAddress("192.168.1.116", 7397));if (isConnect) {//如果连接成功,则表示该SocketChannel已经可以进行读操作(OP_READ),// 因此将其注册到Selector上,等待IO事件的发生。socketChannel.register(selector, SelectionKey.OP_READ);} else {//如果连接失败,则需要等待连接建立完成(OP_CONNECT)的IO事件。// 同样将该SocketChannel注册到Selector上,等待IO事件的发生。socketChannel.register(selector, SelectionKey.OP_CONNECT);}System.out.println("kjz-demo-netty nio client start done.");//创建客户端消息处理器对象,,并传入Selector和字符集参数,然后启动该对象的线程。new NioClientHandler(selector, Charset.forName("GBK")).start();}}

客户端消息处理器

package com.kjz.NettyDemo.Nio.client;import com.kjz.NettyDemo.Nio.ChannelAdapter;
import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
//客户端消息处理器
public class NioClientHandler extends ChannelAdapter {public NioClientHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());//向服务端响应连接成功的信息ctx.writeAndFlush("hi! My name is KJZ NioClient to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+ " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}

服务端

package com.kjz.NettyDemo.Nio.server;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.Charset;public class NioServer {private Selector selector;private ServerSocketChannel socketChannel;public static void main(String[] args) throws IOException {new NioServer().bind(7397);}public void bind(int port) {try {selector = Selector.open();socketChannel = ServerSocketChannel.open();socketChannel.configureBlocking(false);socketChannel.socket().bind(new InetSocketAddress(port), 1024);socketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("itstack-demo-netty nio server start done. ");new NioServerHandler(selector, Charset.forName("GBK")).start();} catch (IOException e) {e.printStackTrace();}}}

服务端消息处理器

package com.kjz.NettyDemo.Nio.server;import com.kjz.NettyDemo.Nio.ChannelAdapter;
import com.kjz.NettyDemo.Nio.ChannelHandler;import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
//服务端消息处理器
public class NioServerHandler extends ChannelAdapter {public NioServerHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("连接报告LocalAddress:" + ctx.channel().getLocalAddress());ctx.writeAndFlush("hi! My name is KJZ NioServer to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我已经收到你的消息Success!\r\n");}}

通用消息处理器

package com.kjz.NettyDemo.Nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;//通用消息处理类
public abstract class ChannelAdapter extends Thread {private Selector selector;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Selector selector, Charset charset) {this.selector = selector;this.charset = charset;}//线程执行逻辑@Overridepublic void run() {while (true) {try {//阻塞等待就绪的IO事件/*** 常见的等待就绪的IO事件包括:* 可读事件(OP_READ):表示SocketChannel中有数据可读取。* 可写事件(OP_WRITE):表示SocketChannel可以写入数据。* 连接建立完成事件(OP_CONNECT):表示SocketChannel的连接已经建立完成。* 新的客户端连接事件(OP_ACCEPT):表示ServerSocketChannel有新的客户端连接请求。*/selector.select(1000);//遍历等待的IO事件Set selectedKeys = selector.selectedKeys();Iterator it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();handleInput(key);}} catch (Exception ignore) {}}}//处理IO事件private void handleInput(SelectionKey key) throws IOException {if (!key.isValid()) return;// 获取IO事件类型Class superclass = key.channel().getClass().getSuperclass();//根据不同的IO类型进行处理//客户端SocketChannelif (superclass == SocketChannel.class){SocketChannel socketChannel = (SocketChannel) key.channel();if (key.isConnectable()) {//判断是否连接成功if (socketChannel.finishConnect()) {channelHandler = newChannelHandler(socketChannel, charset);channelActive(channelHandler);//事件注册socketChannel.register(selector,SelectionKey.OP_READ);} else {System.exit(1);}}}// 服务端ServerSocketChannelif (superclass == ServerSocketChannel.class){if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();//通过accept方法获取SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();//配置为非阻塞模式socketChannel.configureBlocking(false);//注册读事件(OP_READ)socketChannel.register(selector, SelectionKey.OP_READ);//创建ChannelHandler对象,并调用channelActive方法进行连接通知。channelHandler = new ChannelHandler(socketChannel, charset);channelActive(channelHandler);}}//如果是读事件(isReadableif (key.isReadable()) {//从key中获取SocketChannel对象SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);//从SocketChannel中读取数据到ByteBuffer中int readBytes = socketChannel.read(readBuffer);if (readBytes > 0) {//调用flip()方法将Buffer从写模式切换为读模式readBuffer.flip();//根据剩余可读数据的长度,创建一个字节数组。byte[] bytes = new byte[readBuffer.remaining()];//将缓冲区中的数据读取到字节数组中readBuffer.get(bytes);channelRead(channelHandler, new String(bytes, charset));} else if (readBytes < 0) {key.cancel();socketChannel.close();}}}// 连接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

通道处理器

package com.kjz.NettyDemo.Nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
//通道处理器
public class ChannelHandler {private SocketChannel channel;private Charset charset;public ChannelHandler(SocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}//将数据写入通道public void writeAndFlush(Object msg) {try {byte[] bytes = msg.toString().getBytes(charset);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public SocketChannel channel() {return channel;}}

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了