前言:
此刻我们对“nginx代理netty实现wss”大体比较看重,小伙伴们都想要知道一些“nginx代理netty实现wss”的相关资讯。那么小编也在网上搜集了一些有关“nginx代理netty实现wss””的相关内容,希望你们能喜欢,姐妹们快快来了解一下吧!在之前的Socket学习中,主要都是基于两个Socket客户端:WebSocket和Socket.IO。在做测试的时候也是基于WebSocket消息的发送和接收为主要测试对象。但是对于超多Socket连接没有涉及。
在实践中会发现,这两个实现类都存在一个问题,为了维护1个Socket连接及其功能,通常需要创建多个线程。在计算机硬件资源有限的情况下,线程是稀缺资源,不仅仅是内存占用,也会增加CPU的负担。
之前解决这个问题的方案直接换成「Go」语言版本的Socket客户端。例如:/net/websocket和gorilla/websocket。
其实Java也有相对应的解决方案:「netty」。话不多说,上代码。
依赖
<!-- --><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.85.Final</version></dependency>netty WebSocket客户端
客户端主要的功能就是创建连接,然后使用一个事件处理线程池管理连接以及收发消息io.netty.channel.EventLoopGroup,然后使用一个io.netty.bootstrap.Bootstrap来作为引导程序。
package com.funtester.socket.netty import com.funtester.frame.execute.ThreadPoolUtil import groovy.util.logging.Log4j2 import io.netty.bootstrap.Bootstrap import io.netty.channel.* import io.netty.channel.group.ChannelGroup import io.netty.channel.group.DefaultChannelGroup import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.codec.http.DefaultHttpHeaders import io.netty.handler.codec.http.HttpClientCodec import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory import io.netty.handler.codec.http.websocketx.WebSocketVersion import io.netty.handler.stream.ChunkedWriteHandler import io.netty.util.concurrent.GlobalEventExecutor @Log4j2 class WebSocketConnector { static Bootstrap bootstrap = new Bootstrap() /** * 处理事件的线程池 */ static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N")) static { bootstrap.group(group).channel(NioSocketChannel.class) } /** * 用于记录和管理所有客户端的channel */ static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) WebSocketClientHandshaker handShaker ChannelPromise handshakeFuture String host int port /** * 网络通道 */ Channel channel WebSocketIoHandler handler /** * WebSocket协议类型的模拟客户端连接器构造方法 * * @param serverIp * @param serverSocketPort * @param group */ WebSocketConnector(String host, int port) { this.host = host this.port = port String URL = this.host + ":" + this.port + "/test" URI uri = new URI(URL) handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())) bootstrap.option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_TIMEOUT, true) .option(ChannelOption.SO_BROADCAST, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() pipeline.addLast(new HttpClientCodec()) pipeline.addLast(new ChunkedWriteHandler()) pipeline.addLast(new HttpObjectAggregator(1024 * 1024)) pipeline.addLast(handler) } }) } /** * 连接 */ void connect() { try { try { ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync() this.channel = future.channel() clients.add(channel) } catch (e) { log.error("创建channel失败", e) } } catch (Exception e) { log.error("连接服务失败", e) } finally { this.handshakeFuture = handler.handshakeFuture() } } /** * 关闭 */ void close() { this.channel.close() } }
这里用到了一个保存现在的所有的活跃channel的类io.netty.channel.group.ChannelGroup,有点就是可以自动管理所有的channel,还能自动剔除已经关闭的channel。
这里还有补充2个发送消息的方法:
/** * 发送文本消息 */ void sendText(String msg) { channel.writeAndFlush(new TextWebSocketFrame(msg)) } /** * 发送ping消息 */ void ping() { channel.writeAndFlush(new PingWebSocketFrame()) }消息处理器
这里需要处理的消息各种类型,继承io.netty.channel.SimpleChannelInboundHandler实现不同的方法即可。
这里有个泛型设置可以直接设置成不同的消息类型,例如io.netty.handler.codec.http.websocketx.WebSocketFrame及其子类,如果确定服务端发来消息的类型的话,可以更加省事儿。
package com.funtester.socket.netty import groovy.util.logging.Log4j2 import io.netty.channel.* import io.netty.channel.group.ChannelGroup import io.netty.channel.group.DefaultChannelGroup import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.websocketx.* import io.netty.handler.timeout.IdleState import io.netty.handler.timeout.IdleStateEvent import io.netty.util.concurrent.GlobalEventExecutor /** * WebSocket协议类型的模拟客户端IO处理器类 */ @Log4j2 class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> { /** * 用于记录和管理所有客户端的channel */ private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) private final WebSocketClientHandshaker handShaker private ChannelPromise handshakeFuture WebSocketIoHandler(WebSocketClientHandshaker handShaker) { this.handShaker = handShaker } ChannelFuture handshakeFuture() { return handshakeFuture } @Override void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise() } @Override void channelActive(ChannelHandlerContext ctx) { handShaker.handshake(ctx.channel()); } @Override void channelInactive(ChannelHandlerContext ctx) { ctx.close() try { super.channelInactive(ctx) } catch (Exception e) { log.error("channelInactive 异常.", e) } log.warn("WebSocket链路与服务器连接已断开.") } @Override void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel() if (!handShaker.isHandshakeComplete()) { try { handShaker.finishHandshake(ch, (FullHttpResponse) msg) handshakeFuture.setSuccess() } catch (WebSocketHandshakeException e) { log.warn("WebSocket Client failed to connect",e) handshakeFuture.setFailure(e) } return } WebSocketFrame frame = (WebSocketFrame) msg if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame String s = textFrame.text() } else if (frame instanceof CloseWebSocketFrame) { log.info("WebSocket Client closing") ch.close() } } @Override void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause) if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause) } ctx.close() super.exceptionCaught(ctx, cause) } @Override void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt // 如果写通道处于空闲状态,就发送心跳命令 if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) { // 发送心跳数据 def channel = ctx.channel() channel.writeAndFlush(new TextWebSocketFrame("dsf")) } } else { super.userEventTriggered(ctx, evt) } } }
这里处理接收到消息的时候并没有选择保存消息的功能,因为netty WebSocket使用场景就是超大量(超过1w)连接。保留返回消息,进行业务验证通常不是这类测试场景的首要目的。所以以后等用到了再说吧。
后面会对比这3种Socket客户端包括Go语言两种Socket客户端在超大量连接方面的资源占用。