前言:
现在兄弟们对“javaso”大约比较注重,你们都想要分析一些“javaso”的相关资讯。那么小编也在网上搜集了一些关于“javaso””的相关文章,希望兄弟们能喜欢,朋友们一起来了解一下吧!说明:
在Reactor经典模型中,Reactor查询到NIO就绪的事件后,分发到Handler,由Handler完成NIO操作和计算的操作。
ChannelHandler,处理一个I/O event或者拦截一个I/O操作,在它的pipeline中将其递交给相邻的下一个handler,该接口有许多的方法需要实现,一般可通过继承ChannelHandlerAdapter来代替。
package io.netty.channel;public interface ChannelInboundHandler extends ChannelHandler { // 注册事件 void channelRegistered(ChannelHandlerContext var1) throws Exception; // void channelUnregistered(ChannelHandlerContext var1) throws Exception; // void channelActive(ChannelHandlerContext var1) throws Exception; // void channelInactive(ChannelHandlerContext var1) throws Exception; // void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; // void channelReadComplete(ChannelHandlerContext var1) throws Exception; // void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; // void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; // void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;}
package io.netty.channel;import java.net.SocketAddress;public interface ChannelOutboundHandler extends ChannelHandler { // void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception; // void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception; // void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception; // void read(ChannelHandlerContext var1) throws Exception; // void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception; // void flush(ChannelHandlerContext var1) throws Exception;}代码案例(传输字节):
package com.what21.netty.channel.bytes;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServerDemo { public static void main(String[] args) throws InterruptedException { ServerBootstrap server = new ServerBootstrap(); EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); server.group(parentGroup, childGroup); server.option(ChannelOption.SO_BACKLOG, 128); server.channel(NioServerSocketChannel.class); server.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new SocketBytesHandler()); } }); ChannelFuture future = server.bind(11112).sync(); future.channel().closeFuture().sync(); }}
package com.what21.netty.channel.bytes;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.extern.slf4j.Slf4j;@Slf4jpublic class SocketBytesHandler extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("1、handlerAdded()"); super.handlerAdded(ctx); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("2、channelRegistered()"); super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("3、channelActive()"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("4、channelRead()"); ByteBuf byteBuf = (ByteBuf) msg; System.out.println("byteBuf=" + byteBuf); int requestLength = byteBuf.readByte(); byte[] bytes = new byte[requestLength]; byteBuf.readBytes(bytes); System.out.println("接收字符串:" + new String(bytes)); // 写数据到客户端 String response = "我是服务器端,你要怎么滴?"; byte[] responseBytes = response.getBytes(); ByteBuf responseByteBuf = Unpooled.copiedBuffer(responseBytes); byte[] lengthBytes = {(byte) responseBytes.length}; ByteBuf resLengByteBuf = Unpooled.copiedBuffer(lengthBytes); ctx.write(resLengByteBuf); ctx.writeAndFlush(responseByteBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("5、channelReadComplete()"); super.channelReadComplete(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("6、channelInactive()"); super.channelInactive(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("7、handlerRemoved()"); super.handlerRemoved(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught()"); super.exceptionCaught(ctx, cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("userEventTriggered()"); super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channelWritabilityChanged()"); super.channelWritabilityChanged(ctx); }}
package com.what21.netty.channel.bytes;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.net.UnknownHostException;import java.nio.ByteBuffer;@Slf4jpublic class SocketClientDemo { public static void main(String[] args) { String host = "127.0.0.1"; int port = 11111; try { Socket socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); socket.setKeepAlive(true); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); // 客户端发送消息 String msg = "我是客户端!"; byte[] requestBytes = msg.getBytes(); out.write(requestBytes.length); out.write(requestBytes); out.flush(); // 客户端接收消息 int length = in.read(); byte[] bytes = new byte[length]; in.read(bytes); System.out.println("响应消息为:" + new String(bytes)); out.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}代码案例(编码后传输字节):
package com.what21.netty.channel.bytes2;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;public class NettyServerDemo { public static void main(String[] args) throws InterruptedException { ServerBootstrap server = new ServerBootstrap(); EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); server.group(parentGroup, childGroup); server.option(ChannelOption.SO_BACKLOG, 128); server.channel(NioServerSocketChannel.class); server.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 解码器自定义长度(ChannelInboundHandler) pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); // ChannelOutboundHandler pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast(new SocketByteHandler()); } }); ChannelFuture future = server.bind(11112).sync(); future.channel().closeFuture().sync(); }}
package com.what21.netty.channel.bytes2;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SocketByteHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中 result.readBytes(result1); String resultStr = new String(result1); System.out.println("Client said:" + resultStr); // 释放资源,这行很关键 result.release(); String response = "I am ok!"; // 在当前场景下,发送的数据必须转换成ByteBuf数组 ByteBuf encoded = ctx.alloc().buffer(4 * response.length()); encoded.writeBytes(response.getBytes()); ctx.write(encoded); ctx.flush(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); ctx.flush(); }}
package com.what21.netty.channel.bytes2;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.net.UnknownHostException;import java.nio.ByteBuffer;@Slf4jpublic class SocketClientDemo { public static void main(String[] args) { String host = "127.0.0.1"; int port = 11112; try { Socket socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); socket.setKeepAlive(true); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); // 客户端发送消息 ByteBuffer header = ByteBuffer.allocate(4); String msg = "我是客户端!"; byte[] msgBytes = msg.getBytes(); header.putInt(msgBytes.length); out.write(header.array()); out.write(msgBytes); out.flush(); byte[] responseLengthBytes = new byte[4]; int readI = in.read(responseLengthBytes); System.out.println("已读长度为:" + readI); // 客户端接收消息 byte[] responseContentBytes = new byte[1024]; int readed = in.read(responseContentBytes); if (readed > 0) { System.out.println("已读长度为:" + readed); String responseContent = new String(responseContentBytes, 0, readed); System.out.println("服务器端响应内容:" + responseContent); } out.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }}
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #javaso