龙空技术网

Java,Netty,Socket客户端与Netty服务器端通信,只传输字节案例

古怪今人 201

前言:

现在兄弟们对“javaso”大约比较注重,你们都想要分析一些“javaso”的相关资讯。那么小编也在网上搜集了一些关于“javaso””的相关文章,希望兄弟们能喜欢,朋友们一起来了解一下吧!

说明:

在Reactor经典模型中,Reactor查询到NIO就绪的事件后,分发到Handler,由Handler完成NIO操作和计算的操作。

ChannelHandler,处理一个I/O event或者拦截一个I/O操作,在它的pipeline中将其递交给相邻的下一个handler,该接口有许多的方法需要实现,一般可通过继承ChannelHandlerAdapter来代替。

package io.netty.channel;public interface ChannelInboundHandler extends ChannelHandler {    // 注册事件    void channelRegistered(ChannelHandlerContext var1) throws Exception;    //    void channelUnregistered(ChannelHandlerContext var1) throws Exception;    //    void channelActive(ChannelHandlerContext var1) throws Exception;    //    void channelInactive(ChannelHandlerContext var1) throws Exception;    //    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;    //    void channelReadComplete(ChannelHandlerContext var1) throws Exception;    //    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;    //    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;    //    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;}
package io.netty.channel;import java.net.SocketAddress;public interface ChannelOutboundHandler extends ChannelHandler {    //    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;    //    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;    //    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    //    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    //    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;    //    void read(ChannelHandlerContext var1) throws Exception;    //    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;    //    void flush(ChannelHandlerContext var1) throws Exception;}
代码案例(传输字节):
package com.what21.netty.channel.bytes;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServerDemo {    public static void main(String[] args) throws InterruptedException {        ServerBootstrap server = new ServerBootstrap();        EventLoopGroup parentGroup = new NioEventLoopGroup();        EventLoopGroup childGroup = new NioEventLoopGroup();        server.group(parentGroup, childGroup);        server.option(ChannelOption.SO_BACKLOG, 128);        server.channel(NioServerSocketChannel.class);        server.childHandler(new ChannelInitializer<SocketChannel>() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {                ChannelPipeline pipeline = ch.pipeline();                pipeline.addLast(new SocketBytesHandler());            }        });        ChannelFuture future = server.bind(11112).sync();        future.channel().closeFuture().sync();    }}
package com.what21.netty.channel.bytes;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import lombok.extern.slf4j.Slf4j;@Slf4jpublic class SocketBytesHandler extends ChannelInboundHandlerAdapter {    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        System.out.println("1、handlerAdded()");        super.handlerAdded(ctx);    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {        System.out.println("2、channelRegistered()");        super.channelRegistered(ctx);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("3、channelActive()");        super.channelActive(ctx);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("4、channelRead()");        ByteBuf byteBuf = (ByteBuf) msg;        System.out.println("byteBuf=" + byteBuf);        int requestLength = byteBuf.readByte();        byte[] bytes = new byte[requestLength];        byteBuf.readBytes(bytes);        System.out.println("接收字符串:" + new String(bytes));        // 写数据到客户端        String response = "我是服务器端,你要怎么滴?";        byte[] responseBytes = response.getBytes();        ByteBuf responseByteBuf = Unpooled.copiedBuffer(responseBytes);        byte[] lengthBytes = {(byte) responseBytes.length};        ByteBuf resLengByteBuf = Unpooled.copiedBuffer(lengthBytes);        ctx.write(resLengByteBuf);        ctx.writeAndFlush(responseByteBuf);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("5、channelReadComplete()");        super.channelReadComplete(ctx);    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("6、channelInactive()");        super.channelInactive(ctx);    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        System.out.println("7、handlerRemoved()");        super.handlerRemoved(ctx);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        System.out.println("exceptionCaught()");        super.exceptionCaught(ctx, cause);    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        System.out.println("userEventTriggered()");        super.userEventTriggered(ctx, evt);    }    @Override    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelWritabilityChanged()");        super.channelWritabilityChanged(ctx);    }}
package com.what21.netty.channel.bytes;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.net.UnknownHostException;import java.nio.ByteBuffer;@Slf4jpublic class SocketClientDemo {    public static void main(String[] args) {        String host = "127.0.0.1";        int port = 11111;        try {            Socket socket = new Socket();            socket.connect(new InetSocketAddress(host, port));            socket.setKeepAlive(true);            OutputStream out = socket.getOutputStream();            InputStream in = socket.getInputStream();            // 客户端发送消息            String msg = "我是客户端!";            byte[] requestBytes = msg.getBytes();            out.write(requestBytes.length);            out.write(requestBytes);            out.flush();            // 客户端接收消息            int length = in.read();            byte[] bytes = new byte[length];            in.read(bytes);            System.out.println("响应消息为:" + new String(bytes));            out.close();        } catch (UnknownHostException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }}
代码案例(编码后传输字节):
package com.what21.netty.channel.bytes2;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;public class NettyServerDemo {    public static void main(String[] args) throws InterruptedException {        ServerBootstrap server = new ServerBootstrap();        EventLoopGroup parentGroup = new NioEventLoopGroup();        EventLoopGroup childGroup = new NioEventLoopGroup();        server.group(parentGroup, childGroup);        server.option(ChannelOption.SO_BACKLOG, 128);        server.channel(NioServerSocketChannel.class);        server.childHandler(new ChannelInitializer<SocketChannel>() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {                ChannelPipeline pipeline = ch.pipeline();                // 解码器自定义长度(ChannelInboundHandler)                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));                // ChannelOutboundHandler                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));                pipeline.addLast(new SocketByteHandler());            }        });        ChannelFuture future = server.bind(11112).sync();        future.channel().closeFuture().sync();    }}
package com.what21.netty.channel.bytes2;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SocketByteHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg)            throws Exception {        ByteBuf result = (ByteBuf) msg;        byte[] result1 = new byte[result.readableBytes()];        // msg中存储的是ByteBuf类型的数据,把数据读取到byte[]中          result.readBytes(result1);        String resultStr = new String(result1);        System.out.println("Client said:" + resultStr);        // 释放资源,这行很关键          result.release();        String response = "I am ok!";        // 在当前场景下,发送的数据必须转换成ByteBuf数组          ByteBuf encoded = ctx.alloc().buffer(4 * response.length());        encoded.writeBytes(response.getBytes());        ctx.write(encoded);        ctx.flush();    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        super.channelReadComplete(ctx);        ctx.flush();    }}
package com.what21.netty.channel.bytes2;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.InetSocketAddress;import java.net.Socket;import java.net.UnknownHostException;import java.nio.ByteBuffer;@Slf4jpublic class SocketClientDemo {    public static void main(String[] args) {        String host = "127.0.0.1";        int port = 11112;        try {            Socket socket = new Socket();            socket.connect(new InetSocketAddress(host, port));            socket.setKeepAlive(true);            OutputStream out = socket.getOutputStream();            InputStream in = socket.getInputStream();            // 客户端发送消息            ByteBuffer header = ByteBuffer.allocate(4);            String msg = "我是客户端!";            byte[] msgBytes = msg.getBytes();            header.putInt(msgBytes.length);            out.write(header.array());            out.write(msgBytes);            out.flush();            byte[] responseLengthBytes = new byte[4];            int readI = in.read(responseLengthBytes);            System.out.println("已读长度为:" + readI);            // 客户端接收消息            byte[] responseContentBytes = new byte[1024];            int readed = in.read(responseContentBytes);            if (readed > 0) {                System.out.println("已读长度为:" + readed);                String responseContent = new String(responseContentBytes, 0, readed);                System.out.println("服务器端响应内容:" + responseContent);            }            out.close();        } catch (UnknownHostException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }}

标签: #javaso