前言:
眼前各位老铁们对“javaecho”大概比较珍视,同学们都想要剖析一些“javaecho”的相关文章。那么小编在网上收集了一些有关“javaecho””的相关知识,希望大家能喜欢,我们快快来学习一下吧!介绍
前端时间用了不少Netty的内容,但总也是理解的不够深入,今天打开官网,从官网最简单的案例开始,这里并进行一个记录,进行一个分享。
之前的参考:
Netty,事件驱动,阻塞和非阻塞通信=>
NIO的ByteBuffer,Netty缓冲区ByteBuf及内部结构设计=>
Netty,Socket客户端与Netty服务器端通信,只传输字节案例=>
Netty,实现HTTP服务器案例,实现简单的TCP通信案例=>
ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现,ChannelInboundHandler提供了各种可以重写的事件处理程序方法。
代码共享
弃包案例
官方说明:世界上最简单的协议不是:'Hello, World!',而是:DISCARD,没有任何响应的情况下丢弃任何接收到的数据。
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 丢弃任何传入数据 */public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new DiscardServer(8080).run(); }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) // 重写channelRead()事件处理程序方法 // 每当从客户机接收到新数据时,就会使用接收到的消息调用此方法 // 接收到的消息的类型为ByteBuf // 以静默方式丢弃接收到的数据 // ByteBuf是一个引用计数对象,release()方法显式释放 ((ByteBuf) msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) // I/O错误或处理程序实现因处理事件时引发的异常而引发异常时, // exceptionCaught()事件处理程序方法将使用Throwable调用。 // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }}
Echo交互
当连接打开时发送一条消息,并将收到的任何数据回显到服务器,echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.SelfSignedCertificate;/** * 回显从客户端接收到的任何数据 */public final class EchoServer { static final boolean SSL = true; static final int PORT = 8007; public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(msg); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 引发异常时关闭连接. cause.printStackTrace(); ctx.close(); }}
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;/** * 当连接打开时发送一条消息,并将收到的任何数据回显到服务器。 * echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。 */public final class EchoClient { static final boolean SSL = true; static final String HOST = "127.0.0.1"; static final int PORT = 8007; static final int SIZE = 256; public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }}
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * echo客户端的处理程序实现 */public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * 创建客户端处理程序 */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { // 第一次执行 System.out.println("已发送:" + firstMessage); ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("已接收:" + msg); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 引发异常时关闭连接 cause.printStackTrace(); ctx.close(); }}
时间服务通信
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 编写时间服务器 */public class TimeServer { private int port; public TimeServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new TimeServer(44456).run(); }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) { // (1) final ByteBuf time = ctx.alloc().buffer(4); // (2) time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; ctx.close(); } }); // (4) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * 编写时间客户端 */public class TimeClient { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 44456; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // 启动客户端. ChannelFuture f = b.connect(host, port).sync(); // (5) // 等待连接关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } }}
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Date;public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) try { long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
标签: #javaecho