前言:
目前同学们对“netty 连接数”大概比较关注,你们都想要分析一些“netty 连接数”的相关知识。那么小编也在网络上搜集了一些对于“netty 连接数””的相关内容,希望小伙伴们能喜欢,兄弟们快快来学习一下吧!在进行WebSocket协议连接或者WebSocket接口测试的时候,一旦遇到超大连接数量的场景中时,之前使用过的实现 Java-WebSocket 以及 Netty-WebSocket 两种实现就会显示出巨大的性能差距。当然 Netty-WebSocket 就是为了解决性能问题而来的。
so,今天我就来展示一下两个 WebSocket 实现在使用中具体的差异,本文集中在资源占用上,特别是线程占用。
理论差异Java-WebSocket
据可靠资料显示,两者的差异主要以在管理 WebSocket 连接时使用的线程数不同,以下是使用org.java_websocket.client.WebSocketClient创建WebSocket客户端时,它会创建以下几个线程:
「ConnectThread(连接线程)」:当你调用WebSocketClient.connect()方法时,WebSocket客户端会创建一个单独的线程来处理连接建立的过程。这个线程负责建立实际的WebSocket连接。「WriteThread(写线程)」:WebSocket客户端还会创建一个单独的线程,用于发送WebSocket消息。当你调用WebSocket.send()方法发送消息时,消息将被发送到这个线程,然后由该线程负责将消息写入到底层的WebSocket连接中。「ReadThread(读线程)」:WebSocket客户端会创建一个用于接收WebSocket消息的线程。这个线程会持续监听来自WebSocket服务器的消息,并在接收到消息时触发相应的事件处理器。
这些线程的存在使得WebSocket客户端能够在后台处理连接、发送和接收消息,而不会阻塞主线程。这有助于确保应用程序在与WebSocket服务器进行通信时能够保持响应性。
据资料显示不同版本的实现线程是不一样的,这里我没有找到具体的版本差异,也没有进行测试。
Netty-WebSocket
Netty其实并不存在上面这个问题,因为WebSocket连接和线程数并没有强的绑定关系。Netty只有一个处理事件的 io.netty.channel.EventLoopGroup 需要使用线程池设计,其他均没有设置线程和创建线程的设置。
被测服务
这里我用Go写了一个 WebSocket 的服务端,一来省事儿,二来性能高足以应付接下来的测试。服务端代码如下:
// CreateServer // @Description: 重建一个WebSocket服务 // @param port 端口 // @param path 路径 func CreateServer(port int, path string) { var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, HandshakeTimeout: 5 * time.Second, } http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { conn, _ := upgrader.Upgrade(w, r, nil) conn.WriteMessage(websocket.TextMessage, []byte("msg")) for { msgType, msg, err := conn.ReadMessage() if err != nil { log.Println(err) return } fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg)) if err = conn.WriteMessage(msgType, msg); err != nil { log.Println("ffahv") return } } }) http.ListenAndServe(":"+strconv.Itoa(port), nil) }单链接对比空Java进程
首先测试一下空的Java进行消耗的线程数详情,测试客户端如下:
import com.funtester.frame.SourceCode class Empty extends SourceCode{ static void main(String[] args) { waitForKey("按任意键退出") } }
运行时,进行监控:
空Java进行
Java-WebSocket
只创建1个WebSocket客户端,测试代码如下:
package com.funtest.websocketimport com.funtester.frame.SourceCodeimport com.funtester.socket.WebSocketFunClientclass WebSocket extends SourceCode { static String url = "ws://localhost:12345/test" static void main(String[] args) { def instance = WebSocketFunClient.getInstance(url) instance.connect() instance.send("Hello FunTester") waitForKey("按任意键退出") }}
运行线程监控:
WebSocket单线程
Netty-WebSocket
逻辑同上,代码如下:
package com.funtest.websocketimport com.funtester.frame.SourceCodeimport com.funtester.socket.netty.WebSocketConnectorimport groovy.util.logging.Log4j2@Log4j2class NettySocket extends SourceCode { static void main(String[] args) { String serverIp = "ws://127.0.0.1"; int serverPort = 12345; def h = {String x -> log.info("收到消息:{}", x) } WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test",h) client.connect() client.getHandshakeFuture().get() client.sendText("Hello FunTester").get() waitForKey("按任意键退出") }}
运行时线程监控:
Netty-WebSocket
结论
Java-WebSocket额外创建了3个线程,而Netty-WebSocket额外创建了1个线程。这里我采取了默认的 io.netty.channel.EventLoopGroup 创建策略。
1000连接Netty-WebSocket
测试代码如下:
package com.funtest.websocket import com.funtester.frame.SourceCode import com.funtester.socket.netty.WebSocketConnector import groovy.util.logging.Log4j2 @Log4j2 class NettySocket extends SourceCode { static void main(String[] args) { String serverIp = "ws://127.0.0.1"; int serverPort = 12345; def h = {String x -> log.info("收到消息:{}", x) } 1000.times { WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test", h) client.connect() client.getHandshakeFuture().get() client.sendText("Hello FunTester").get() } waitForKey("按任意键退出") } }
运行时线程监控:
Netty1000连接
Java-WebSocket
由于创建实在太慢了,我测试了100个连接,测试代码如下:
package com.funtest.websocket import com.funtester.frame.SourceCode import com.funtester.socket.WebSocketFunClient class WebSocket extends SourceCode { static String url = "ws://localhost:12345/test" static void main(String[] args) { 100.times { fun { def instance = WebSocketFunClient.getInstance(url) instance.connect() instance.send("Hello FunTester") } } waitForKey("按任意键退出") } }
运行时线程监控:
WebSocket1000连接
Netty极限
如果我们只是单纯测试连接数量的话,并没有必要创建很多处理WebSocket事件的线程,我们可以直接写死成1个线程。下面是测试结果:
Netty极限1000连接
结论
Netty稳如狗!
代码更新
在本次的实践中,我对Netty-WebSocket的实现又做了一批更新,主要增加WebSocket接口路径和消息处理闭包功能,并且偷偷修复了BUG。
WebSocketConnector
package com.funtester.socket.netty import com.funtester.frame.execute.ThreadPoolUtil import groovy.util.logging.Log4j2 import io.netty.bootstrap.Bootstrap import io.netty.channel.* import io.netty.channel.group.ChannelGroup import io.netty.channel.group.DefaultChannelGroup import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.codec.http.DefaultHttpHeaders import io.netty.handler.codec.http.HttpClientCodec import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.websocketx.* import io.netty.handler.stream.ChunkedWriteHandler import io.netty.util.concurrent.GlobalEventExecutor @Log4j2 class WebSocketConnector { static Bootstrap bootstrap = new Bootstrap() /** * 处理事件的线程池 */ static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N")) static { bootstrap.group(group).channel(NioSocketChannel.class) } /** * 用于记录和管理所有客户端的channel */ static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) WebSocketClientHandshaker handShaker ChannelPromise handshakeFuture String host int port String path /** * 网络通道 */ Channel channel WebSocketIoHandler handler /** * WebSocket协议类型的模拟客户端连接器构造方法 * * @param serverIp * @param serverSocketPort * @param group */ WebSocketConnector(String host, int port, String path, Closure closure = null) { this.host = host this.port = port this.path = path String URL = this.host + ":" + this.port + path URI uri = new URI(URL) handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())) if (closure != null) handler.closure = closure bootstrap.option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline() pipeline.addLast(new HttpClientCodec()) pipeline.addLast(new ChunkedWriteHandler()) pipeline.addLast(new HttpObjectAggregator(1024 * 1024)) pipeline.addLast(handler) } }) } /** * 连接 */ void connect() { try { try { ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync() this.channel = future.channel() clients.add(channel) } catch (e) { log.error("创建channel失败", e) } } catch (Exception e) { log.error("连接服务失败", e) } finally { this.handshakeFuture = handler.handshakeFuture() } } /** * 发送文本消息 */ ChannelFuture sendText(String msg) { channel.writeAndFlush(new TextWebSocketFrame(msg)) } /** * 发送ping消息 */ ChannelFuture ping() { channel.writeAndFlush(new PingWebSocketFrame()) } /** * 关闭 */ void close() { group.shutdownGracefully() } }WebSocketIoHandler
package com.funtester.socket.netty import groovy.util.logging.Log4j2 import io.netty.channel.* import io.netty.channel.group.ChannelGroup import io.netty.channel.group.DefaultChannelGroup import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.websocketx.* import io.netty.handler.timeout.IdleState import io.netty.handler.timeout.IdleStateEvent import io.netty.util.concurrent.GlobalEventExecutor /** * WebSocket协议类型的模拟客户端IO处理器类 */ @Log4j2 class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> { /** * 用于记录和管理所有客户端的channel */ private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) private final WebSocketClientHandshaker handShaker Closure closure private ChannelPromise handshakeFuture WebSocketIoHandler(WebSocketClientHandshaker handShaker) { this.handShaker = handShaker } ChannelFuture handshakeFuture() { return handshakeFuture } @Override void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise() } @Override void channelActive(ChannelHandlerContext ctx) { handShaker.handshake(ctx.channel()); } @Override void channelInactive(ChannelHandlerContext ctx) { ctx.close() try { super.channelInactive(ctx) } catch (Exception e) { log.error("channelInactive 异常.", e) } log.warn("WebSocket链路与服务器连接已断开.") } @Override void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel() if (!handShaker.isHandshakeComplete()) { try { handShaker.finishHandshake(ch, (FullHttpResponse) msg) handshakeFuture.setSuccess() } catch (WebSocketHandshakeException e) { log.warn("WebSocket Client failed to connect", e) handshakeFuture.setFailure(e) } return } WebSocketFrame frame = (WebSocketFrame) msg if (frame instanceof TextWebSocketFrame) { if (closure != null) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame closure(textFrame.text()) } } else if (frame instanceof CloseWebSocketFrame) { log.info("WebSocket Client closing") ch.close() } } @Override void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause) if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause) } ctx.close() super.exceptionCaught(ctx, cause) } @Override void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt // 如果写通道处于空闲状态,就发送心跳命令 if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) { // 发送心跳数据 def channel = ctx.channel() channel.writeAndFlush(new TextWebSocketFrame("dsf")) } } else { super.userEventTriggered(ctx, evt) } } }
标签: #netty 连接数