龙空技术网

高性能netty之多通道实现

编码是个技术活 796

前言:

今天姐妹们对“nettytcp最大连接数”可能比较着重,同学们都需要学习一些“nettytcp最大连接数”的相关内容。那么小编也在网摘上汇集了一些对于“nettytcp最大连接数””的相关文章,希望同学们能喜欢,我们快快来了解一下吧!



一、netty简介

Netty的主要目的是基于NIO构建具有网络和业务逻辑组件的分离和松耦合的高性能协议服务器。它可以实现多种协议,例如HTTP或你自己的特定协议。

Netty有一系列丰富的特性:

有一套统一的API来处理异步和同步编程模式使用非常灵活简单但却强大的线程机制业务组件分离方便重用极小的缩减不必要的Memory Copy二、netty核心概念

Netty是一个非阻塞框架。与阻塞IO相比,这导致高吞吐量。了解无阻塞IO对于了解Netty的核心组件及其关系至关重要。

三、netty核心组件

1.Channel

Channel是Java NIO的基础。它表示一个开放的连接,能够执行IO操作,例如读取和写入。简单的说,Channel 就是代表连接,实体之间的连接,程序之间的连接,文件之间的连接,设备之间的连接。同时它也是数据入站和出站的载体。

2.Future

Netty 通道中的每个IO操作都是非阻塞的。这意味着调用后立即返回所有操作。标准Java库中有一个Future接口,但是对于Netty而言并不方便-我们只能向Future询问操作的完成情况,或在操作完成之前阻塞当前线程。这就是Netty拥有自己的ChannelFuture接口的原因。我们可以将回调传递给ChannelFuture,该回调将在操作完成时被调用。

3.EventLoop 和 EventLoopGroup

既然有了 Channel 连接服务,让信息之间可以流动。如果服务发出的消息称作“出站”消息,服务接受的消息称作“入站”消息。那么消息的“出站”/“入站”就会产生事件(Event)。例如:连接已激活;数据读取;用户事件;异常事件;打开链接;关闭链接等等。

在netty中一个Channel都会分配一个EventLoop,一个EventLoop可以服务于多个Channel。

EventLoopGroup,可以理解为EventLoop组,一个EventLoopGroup包含了多个EventLoop。

4.Handlers

Netty提供了ChannelHandler实现的巨大层次结构。值得注意的是适配器只是空的实现,例如ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。当我们只需要处理所有事件的子集时,可以扩展这些适配器。而且,有许多特定协议(例如HTTP)的实现,例如HttpRequestDecoder,HttpResponseEncoder,HttpObjectAggregator。

5.Encoders and Decoders

在使用网络协议时,我们需要执行数据序列化和反序列化。为此,Netty 为能够解码传入数据的解码器引入了ChannelInboundHandler的特殊扩展。大多数解码器的基类是ByteToMessageDecoder。

为了对传出的数据进行编码,Netty具有ChannelOutboundHandler的扩展,称为编码器。MessageToByteEncoder是大多数编码器实现的基础。我们可以使用编码器和解码器将消息从字节序列转换为Java对象,反之亦然。

四,netty多通道

上面我们对netty做了简单介绍及了解了netty相关的核心组件,下面我们讨论下netty多通道的实现。

一般我们通过 handler() 或childHandler() 都只添加了一个 Channel通道,对于复杂的应用单通道可能无法满足要求,这里我通过示例介绍一种多通道的实现方式:

1.Netty服务端:
nettyServer.java

@Componentpublic class NettyServer {/**netty主机*/@Value("${rzt.inquiry.netty-host}")private String nettyHost;/**netty端口*/@Value("${rzt.inquiry.netty-port}")private Integer nettyPort;/**netty最大长度*/@Value("${rzt.inquiry.netty-maxlength}")private Integer nettyMaxLength;/**通过nio方式来接收连接和处理连接*/private EventLoopGroup boss = new NioEventLoopGroup();/**worker*/private EventLoopGroup work = new NioEventLoopGroup();/**创建bootstrap*/private ServerBootstrap bootstrap = new ServerBootstrap();/**通道适配器*/@Resourceprivate NettyServerHandler nettyServerHandler;/**NETTY编码格式*/public final String CHARSET = "UTF-8";/*** 启动netty*/public void start() {try {bootstrap.group(boss, work);bootstrap.channel(NioServerSocketChannel.class);bootstrap.option(ChannelOption.SO_BACKLOG, 100);bootstrap.handler(new LoggingHandler(LogLevel.INFO));/**设置过滤器(设置事件处理)*/bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline cp = ch.pipeline();//1. 添加心跳支持(每隔指定时间来检查一下channelRead方法被调用的情况,如果在指定时间内该链上的channelRead方法都没有被触发,就会调用userEventTriggered)cp.addLast(new IdleStateHandler(0, 0, 0, TimeUnit.DAYS));/**设置已@CRT为分隔符*/ByteBuf delimiter = Unpooled.copiedBuffer(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER.getBytes());cp.addLast("framer",new DelimiterBasedFrameDecoder(nettyMaxLength * nettyMaxLength, false, delimiter));/**解码器*/cp.addLast("decoder", new StringDecoder(Charset.forName(CHARSET)));/**编码器*/cp.addLast("encoder", new StringEncoder(Charset.forName(CHARSET)));cp.addLast(new DefaultEventExecutorGroup(8));/**服务端业务逻辑*/cp.addLast("handler", nettyServerHandler);}});/**服务端绑定IP及端口监听*/MySlf4j.textInfo("netty服务器在[{0}]端口启动监听", nettyPort);ChannelFuture futrue = bootstrap.bind(nettyHost, nettyPort).sync();/**监听服务器关闭监听*/futrue.channel().closeFuture().sync();} catch (InterruptedException ex) {MySlf4j.textError("netty服务端启动异常{0}", MySlf4j.ExceptionToString(ex));boss.shutdownGracefully();work.shutdownGracefully();}}/*** 关闭服务器方法*/@PreDestroypublic void close() {MySlf4j.textInfo("关闭netty服务端....");//优雅退出boss.shutdownGracefully();work.shutdownGracefully();}}

NettyServerHandler.java

@Component@ChannelHandler.Sharablepublic class NettyServerHandler extends SimpleChannelInboundHandler<String> {@Autowiredprivate ICreditInquiryService creditInquiryService;/**释放通道数*/private int lossConnectCount = 0;/*** 服务端消息处理*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {msg = msg.replaceAll(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER, "");if (msg instanceof String) {if ("ping-pong-ping-pong".equals(msg)) {MySlf4j.textInfo("[心跳监测] {0}:通道活跃", channelHandlerContext.channel().id());lossConnectCount = 0;return;}}..........//接受netty客户端请求数据,并响应channelFuture = channelHandlerContext.writeAndFlush(responseBody);}/*** 触发器*/@Overridepublic void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object evt) throws Exception {MySlf4j.textInfo("[已经有xx小时没有接收到客户端消息]");if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;// READER_IDLE:一段时间内没有数据接收if (idleStateEvent.state() == IdleState.READER_IDLE) {lossConnectCount++;if (lossConnectCount > 2) {MySlf4j.textInfo("[释放不活跃通道] {0}", channelHandlerContext.channel().id());channelHandlerContext.channel().close();}}} else {super.userEventTriggered(channelHandlerContext, evt);}}}

2.netty客户端

nettyClient.java

@Componentpublic class NettyClient {/**netty主机*/@Value("${rzt.auth.netty-host}")private String nettyHost;/**netty端口*/@Value("${rzt.auth.netty-port}")private Integer nettyPort;/**初始化重试次数*/private static int retry = 0;/**初始化Bootstrap实例*/private Bootstrap bootstrap = new Bootstrap();/** 工人线程组*/private EventLoopGroup workerGroup = new NioEventLoopGroup();/**key为目标host,value为目标host的连接池*/public static ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap;@Autowiredprivate NettyChannelPoolHandler nettyChannelPoolHandler;/*** netty客户端启动*/public void init() {MySlf4j.textInfo("netty客户端启动连接,host:{0},port:{1}", nettyHost, nettyPort);bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true);//1. 创建连接池mappoolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {@Overrideprotected SimpleChannelPool newPool(InetSocketAddress inetSocketAddress) {//maxConnections: 最大连接数,超过则进入pendingAcquireQueue等待获取连接return new FixedChannelPool(bootstrap.remoteAddress(inetSocketAddress), nettyChannelPoolHandler,20);}};InetSocketAddress addr1 = new InetSocketAddress(nettyHost, nettyPort);//创建4个channel连接发送消息for (int i = 0; i < 4; i++) {//2. 取出连接addr1地址的连接池final SimpleChannelPool pool1 = poolMap.get(addr1);//3. 获取一个连接Future<Channel> channelFuture = pool1.acquire();channelFuture.addListener(new FutureListener<Channel>() {@Overridepublic void operationComplete(Future<Channel> channelFuture) throws Exception {if (channelFuture.isSuccess()) {//连接地址1的某个channelChannel ch = channelFuture.getNow();//使用连接发送消息ch.writeAndFlush("ping-pong-ping-pong".concat(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER));//释放连接,将连接放回连接池pool1.release(ch);}}});}}}

NettyClientHandler.java

@Component@ChannelHandler.Sharablepublic class NettyClientHandler extends SimpleChannelInboundHandler<String> {@Autowiredprivate IAnalysisWriteService analysisWriteService;@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {MySlf4j.textInfo("[OVERLENGTH]客户端收到消息{0},消息长度:{1}", msg, msg.length());/** 创建线程池 */ExecutorService excutorService = Executors.newFixedThreadPool(5);excutorService.execute(new Runnable() {@Overridepublic void run() {//通过线程池处理响应数据.....}});}}

服务启动后,我们通过netstat -an就可以看到建立了4条Channel。


标签: #nettytcp最大连接数