龙空技术网

Java,Netty,官方案例,弃包、Echo和时间服务通信代码案例分享

古怪今人 181

前言:

眼前各位老铁们对“javaecho”大概比较珍视,同学们都想要剖析一些“javaecho”的相关文章。那么小编在网上收集了一些有关“javaecho””的相关知识,希望大家能喜欢,我们快快来学习一下吧!

介绍

前端时间用了不少Netty的内容,但总也是理解的不够深入,今天打开官网,从官网最简单的案例开始,这里并进行一个记录,进行一个分享。

之前的参考:

Netty,事件驱动,阻塞和非阻塞通信=>

NIO的ByteBuffer,Netty缓冲区ByteBuf及内部结构设计=>

Netty,Socket客户端与Netty服务器端通信,只传输字节案例=>

Netty,实现HTTP服务器案例,实现简单的TCP通信案例=>

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现,ChannelInboundHandler提供了各种可以重写的事件处理程序方法。

代码共享

弃包案例

官方说明:世界上最简单的协议不是:'Hello, World!',而是:DISCARD,没有任何响应的情况下丢弃任何接收到的数据。

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 丢弃任何传入数据 */public class DiscardServer {    private int port;    public DiscardServer(int port) {        this.port = port;    }    public void run() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class) // (3)                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new DiscardServerHandler());                        }                    })                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)            // Bind and start to accept incoming connections.            ChannelFuture f = b.bind(port).sync(); // (7)            // Wait until the server socket is closed.            // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new DiscardServer(8080).run();    }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)        // 重写channelRead()事件处理程序方法        // 每当从客户机接收到新数据时,就会使用接收到的消息调用此方法        // 接收到的消息的类型为ByteBuf        // 以静默方式丢弃接收到的数据        // ByteBuf是一个引用计数对象,release()方法显式释放        ((ByteBuf) msg).release(); // (3)    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)        // I/O错误或处理程序实现因处理事件时引发的异常而引发异常时,        // exceptionCaught()事件处理程序方法将使用Throwable调用。        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}

Echo交互

当连接打开时发送一条消息,并将收到的任何数据回显到服务器,echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.SelfSignedCertificate;/** * 回显从客户端接收到的任何数据 */public final class EchoServer {    static final boolean SSL = true;    static final int PORT = 8007;    public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            if (sslCtx != null) {                                p.addLast(sslCtx.newHandler(ch.alloc()));                            }                            //p.addLast(new LoggingHandler(LogLevel.INFO));                            p.addLast(new EchoServerHandler());                        }                    });            // Start the server.            ChannelFuture f = b.bind(PORT).sync();            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}
import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        System.out.println(msg);        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 引发异常时关闭连接.        cause.printStackTrace();        ctx.close();    }}
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;/** * 当连接打开时发送一条消息,并将收到的任何数据回显到服务器。 * echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。 */public final class EchoClient {    static final boolean SSL = true;    static final String HOST = "127.0.0.1";    static final int PORT = 8007;    static final int SIZE = 256;    public static void main(String[] args) throws Exception {        // Configure SSL.git        final SslContext sslCtx;        if (SSL) {            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();        } else {            sslCtx = null;        }        // Configure the client.        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            if (sslCtx != null) {                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));                            }                            //p.addLast(new LoggingHandler(LogLevel.INFO));                            p.addLast(new EchoClientHandler());                        }                    });            // Start the client.            ChannelFuture f = b.connect(HOST, PORT).sync();            // Wait until the connection is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down the event loop to terminate all threads.            group.shutdownGracefully();        }    }}
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * echo客户端的处理程序实现 */public class EchoClientHandler extends ChannelInboundHandlerAdapter {    private final ByteBuf firstMessage;    /**     * 创建客户端处理程序     */    public EchoClientHandler() {        firstMessage = Unpooled.buffer(EchoClient.SIZE);        for (int i = 0; i < firstMessage.capacity(); i++) {            firstMessage.writeByte((byte) i);        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        // 第一次执行        System.out.println("已发送:" + firstMessage);        ctx.writeAndFlush(firstMessage);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("已接收:" + msg);        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 引发异常时关闭连接        cause.printStackTrace();        ctx.close();    }}

时间服务通信

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 编写时间服务器 */public class TimeServer {    private int port;    public TimeServer(int port) {        this.port = port;    }    public void run() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class) // (3)                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new TimeServerHandler());                        }                    })                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)            // Bind and start to accept incoming connections.            ChannelFuture f = b.bind(port).sync(); // (7)            // Wait until the server socket is closed.            // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new TimeServer(44456).run();    }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class TimeServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(final ChannelHandlerContext ctx) { // (1)        final ByteBuf time = ctx.alloc().buffer(4); // (2)        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));        final ChannelFuture f = ctx.writeAndFlush(time); // (3)        f.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) {                assert f == future;                ctx.close();            }        }); // (4)    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * 编写时间客户端 */public class TimeClient {    public static void main(String[] args) throws Exception {        String host = "127.0.0.1";        int port = 44456;        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap(); // (1)            b.group(workerGroup); // (2)            b.channel(NioSocketChannel.class); // (3)            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)            b.handler(new ChannelInitializer<SocketChannel>() {                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ch.pipeline().addLast(new TimeClientHandler());                }            });            // 启动客户端.            ChannelFuture f = b.connect(host, port).sync(); // (5)            // 等待连接关闭            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();        }    }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Date;public class TimeClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ByteBuf m = (ByteBuf) msg; // (1)        try {            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;            System.out.println(new Date(currentTimeMillis));            ctx.close();        } finally {            m.release();        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}

标签: #javaecho