龙空技术网

Java-WebSocket vs Netty-WebSocket 资源占用

FunTester 546

前言:

目前同学们对“netty 连接数”大概比较关注,你们都想要分析一些“netty 连接数”的相关知识。那么小编也在网络上搜集了一些对于“netty 连接数””的相关内容,希望小伙伴们能喜欢,兄弟们快快来学习一下吧!

在进行WebSocket协议连接或者WebSocket接口测试的时候,一旦遇到超大连接数量的场景中时,之前使用过的实现 Java-WebSocket 以及 Netty-WebSocket 两种实现就会显示出巨大的性能差距。当然 Netty-WebSocket 就是为了解决性能问题而来的。

so,今天我就来展示一下两个 WebSocket 实现在使用中具体的差异,本文集中在资源占用上,特别是线程占用。

理论差异Java-WebSocket

据可靠资料显示,两者的差异主要以在管理 WebSocket 连接时使用的线程数不同,以下是使用org.java_websocket.client.WebSocketClient创建WebSocket客户端时,它会创建以下几个线程:

「ConnectThread(连接线程)」:当你调用WebSocketClient.connect()方法时,WebSocket客户端会创建一个单独的线程来处理连接建立的过程。这个线程负责建立实际的WebSocket连接。「WriteThread(写线程)」:WebSocket客户端还会创建一个单独的线程,用于发送WebSocket消息。当你调用WebSocket.send()方法发送消息时,消息将被发送到这个线程,然后由该线程负责将消息写入到底层的WebSocket连接中。「ReadThread(读线程)」:WebSocket客户端会创建一个用于接收WebSocket消息的线程。这个线程会持续监听来自WebSocket服务器的消息,并在接收到消息时触发相应的事件处理器。

这些线程的存在使得WebSocket客户端能够在后台处理连接、发送和接收消息,而不会阻塞主线程。这有助于确保应用程序在与WebSocket服务器进行通信时能够保持响应性。

据资料显示不同版本的实现线程是不一样的,这里我没有找到具体的版本差异,也没有进行测试。

Netty-WebSocket

Netty其实并不存在上面这个问题,因为WebSocket连接和线程数并没有强的绑定关系。Netty只有一个处理事件的 io.netty.channel.EventLoopGroup 需要使用线程池设计,其他均没有设置线程和创建线程的设置。

被测服务

这里我用Go写了一个 WebSocket 的服务端,一来省事儿,二来性能高足以应付接下来的测试。服务端代码如下:

// CreateServer  // @Description: 重建一个WebSocket服务  // @param port 端口  // @param path 路径  func CreateServer(port int, path string) {       var upgrader = websocket.Upgrader{        ReadBufferSize:   1024,        WriteBufferSize:  1024,        HandshakeTimeout: 5 * time.Second,     }       http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {        conn, _ := upgrader.Upgrade(w, r, nil)        conn.WriteMessage(websocket.TextMessage, []byte("msg"))          for {           msgType, msg, err := conn.ReadMessage()           if err != nil {              log.Println(err)              return           }           fmt.Printf("%s receive: %s\n", conn.RemoteAddr(), string(msg))             if err = conn.WriteMessage(msgType, msg); err != nil {              log.Println("ffahv")              return           }        }     })       http.ListenAndServe(":"+strconv.Itoa(port), nil)  }
单链接对比空Java进程

首先测试一下空的Java进行消耗的线程数详情,测试客户端如下:

import com.funtester.frame.SourceCode    class Empty extends SourceCode{        static void main(String[] args) {          waitForKey("按任意键退出")      }          }

运行时,进行监控:

空Java进行

Java-WebSocket

只创建1个WebSocket客户端,测试代码如下:

package com.funtest.websocketimport com.funtester.frame.SourceCodeimport com.funtester.socket.WebSocketFunClientclass WebSocket extends SourceCode {    static String url = "ws://localhost:12345/test"    static void main(String[] args) {        def instance = WebSocketFunClient.getInstance(url)        instance.connect()        instance.send("Hello FunTester")        waitForKey("按任意键退出")    }}

运行线程监控:

WebSocket单线程

Netty-WebSocket

逻辑同上,代码如下:

package com.funtest.websocketimport com.funtester.frame.SourceCodeimport com.funtester.socket.netty.WebSocketConnectorimport groovy.util.logging.Log4j2@Log4j2class NettySocket extends SourceCode {    static void main(String[] args) {        String serverIp = "ws://127.0.0.1";        int serverPort = 12345;        def h = {String x ->            log.info("收到消息:{}", x)        }        WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test",h)        client.connect()        client.getHandshakeFuture().get()        client.sendText("Hello FunTester").get()        waitForKey("按任意键退出")    }}

运行时线程监控:

Netty-WebSocket

结论

Java-WebSocket额外创建了3个线程,而Netty-WebSocket额外创建了1个线程。这里我采取了默认的 io.netty.channel.EventLoopGroup 创建策略。

1000连接Netty-WebSocket

测试代码如下:

package com.funtest.websocket    import com.funtester.frame.SourceCode  import com.funtester.socket.netty.WebSocketConnector  import groovy.util.logging.Log4j2    @Log4j2  class NettySocket extends SourceCode {        static void main(String[] args) {          String serverIp = "ws://127.0.0.1";          int serverPort = 12345;          def h = {String x ->              log.info("收到消息:{}", x)          }          1000.times {              WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, "/test", h)              client.connect()              client.getHandshakeFuture().get()              client.sendText("Hello FunTester").get()          }          waitForKey("按任意键退出")      }  }

运行时线程监控:

Netty1000连接

Java-WebSocket

由于创建实在太慢了,我测试了100个连接,测试代码如下:

package com.funtest.websocket    import com.funtester.frame.SourceCode  import com.funtester.socket.WebSocketFunClient    class WebSocket extends SourceCode {        static String url = "ws://localhost:12345/test"          static void main(String[] args) {          100.times {              fun {                  def instance = WebSocketFunClient.getInstance(url)                  instance.connect()                  instance.send("Hello FunTester")              }          }        waitForKey("按任意键退出")      }  }

运行时线程监控:

WebSocket1000连接

Netty极限

如果我们只是单纯测试连接数量的话,并没有必要创建很多处理WebSocket事件的线程,我们可以直接写死成1个线程。下面是测试结果:

Netty极限1000连接

结论

Netty稳如狗!

代码更新

在本次的实践中,我对Netty-WebSocket的实现又做了一批更新,主要增加WebSocket接口路径和消息处理闭包功能,并且偷偷修复了BUG。

WebSocketConnector

package com.funtester.socket.netty    import com.funtester.frame.execute.ThreadPoolUtil  import groovy.util.logging.Log4j2  import io.netty.bootstrap.Bootstrap  import io.netty.channel.*  import io.netty.channel.group.ChannelGroup  import io.netty.channel.group.DefaultChannelGroup  import io.netty.channel.nio.NioEventLoopGroup  import io.netty.channel.socket.SocketChannel  import io.netty.channel.socket.nio.NioSocketChannel  import io.netty.handler.codec.http.DefaultHttpHeaders  import io.netty.handler.codec.http.HttpClientCodec  import io.netty.handler.codec.http.HttpObjectAggregator  import io.netty.handler.codec.http.websocketx.*  import io.netty.handler.stream.ChunkedWriteHandler  import io.netty.util.concurrent.GlobalEventExecutor    @Log4j2  class WebSocketConnector {        static Bootstrap bootstrap = new Bootstrap()        /**       * 处理事件的线程池       */      static EventLoopGroup group = new NioEventLoopGroup(ThreadPoolUtil.getFactory("N"))        static {          bootstrap.group(group).channel(NioSocketChannel.class)      }        /**       * 用于记录和管理所有客户端的channel       */    static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)        WebSocketClientHandshaker handShaker        ChannelPromise handshakeFuture        String host        int port        String path        /**       * 网络通道       */      Channel channel        WebSocketIoHandler handler        /**       * WebSocket协议类型的模拟客户端连接器构造方法       *       * @param serverIp       * @param serverSocketPort       * @param group       */    WebSocketConnector(String host, int port, String path, Closure closure = null) {          this.host = host          this.port = port          this.path = path          String URL = this.host + ":" + this.port + path          URI uri = new URI(URL)          handler = new WebSocketIoHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()))          if (closure != null) handler.closure = closure          bootstrap.option(ChannelOption.TCP_NODELAY, true)                  .option(ChannelOption.SO_KEEPALIVE, true)                  .handler(new ChannelInitializer<SocketChannel>() {                        @Override                      protected void initChannel(SocketChannel ch) throws Exception {                          ChannelPipeline pipeline = ch.pipeline()                          pipeline.addLast(new HttpClientCodec())                          pipeline.addLast(new ChunkedWriteHandler())                          pipeline.addLast(new HttpObjectAggregator(1024 * 1024))                          pipeline.addLast(handler)                      }                  })      }          /**       * 连接       */      void connect() {          try {              try {                  ChannelFuture future = bootstrap.connect(this.host - "ws://" - "wss://", this.port).sync()                  this.channel = future.channel()                  clients.add(channel)              } catch (e) {                  log.error("创建channel失败", e)              }          } catch (Exception e) {              log.error("连接服务失败", e)          } finally {              this.handshakeFuture = handler.handshakeFuture()          }      }        /**       * 发送文本消息       */      ChannelFuture sendText(String msg) {          channel.writeAndFlush(new TextWebSocketFrame(msg))      }        /**       * 发送ping消息       */      ChannelFuture ping() {          channel.writeAndFlush(new PingWebSocketFrame())      }        /**       * 关闭       */      void close() {          group.shutdownGracefully()      }    }
WebSocketIoHandler
package com.funtester.socket.netty    import groovy.util.logging.Log4j2  import io.netty.channel.*  import io.netty.channel.group.ChannelGroup  import io.netty.channel.group.DefaultChannelGroup  import io.netty.handler.codec.http.FullHttpResponse  import io.netty.handler.codec.http.websocketx.*  import io.netty.handler.timeout.IdleState  import io.netty.handler.timeout.IdleStateEvent  import io.netty.util.concurrent.GlobalEventExecutor    /**   * WebSocket协议类型的模拟客户端IO处理器类   */  @Log4j2  class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {        /**       * 用于记录和管理所有客户端的channel       */    private ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)        private final WebSocketClientHandshaker handShaker        Closure closure        private ChannelPromise handshakeFuture        WebSocketIoHandler(WebSocketClientHandshaker handShaker) {          this.handShaker = handShaker      }        ChannelFuture handshakeFuture() {          return handshakeFuture      }        @Override      void handlerAdded(ChannelHandlerContext ctx) {          handshakeFuture = ctx.newPromise()      }        @Override      void channelActive(ChannelHandlerContext ctx) {          handShaker.handshake(ctx.channel());      }        @Override      void channelInactive(ChannelHandlerContext ctx) {          ctx.close()          try {              super.channelInactive(ctx)          } catch (Exception e) {              log.error("channelInactive 异常.", e)          }          log.warn("WebSocket链路与服务器连接已断开.")      }        @Override      void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {          Channel ch = ctx.channel()          if (!handShaker.isHandshakeComplete()) {              try {                  handShaker.finishHandshake(ch, (FullHttpResponse) msg)                  handshakeFuture.setSuccess()              } catch (WebSocketHandshakeException e) {                  log.warn("WebSocket Client failed to connect", e)                  handshakeFuture.setFailure(e)              }              return          }            WebSocketFrame frame = (WebSocketFrame) msg          if (frame instanceof TextWebSocketFrame) {              if (closure != null) {                  TextWebSocketFrame textFrame = (TextWebSocketFrame) frame                  closure(textFrame.text())              }          } else if (frame instanceof CloseWebSocketFrame) {              log.info("WebSocket Client closing")              ch.close()          }      }        @Override      void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {          log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause)          if (!handshakeFuture.isDone()) {              handshakeFuture.setFailure(cause)          }          ctx.close()          super.exceptionCaught(ctx, cause)      }        @Override      void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {          if (evt instanceof IdleStateEvent) {              IdleStateEvent event = (IdleStateEvent) evt              // 如果写通道处于空闲状态,就发送心跳命令              if (IdleState.WRITER_IDLE == event.state() || IdleState.READER_IDLE == event.state()) {                  // 发送心跳数据                  def channel = ctx.channel()                  channel.writeAndFlush(new TextWebSocketFrame("dsf"))              }          } else {              super.userEventTriggered(ctx, evt)          }      }  }

标签: #netty 连接数