龙空技术网

Netty从入门到秃头:websocket

大鹅啊 180

前言:

如今大家对“nettysocketio和websocket”都比较看重,我们都想要剖析一些“nettysocketio和websocket”的相关知识。那么小编在网摘上网罗了一些关于“nettysocketio和websocket””的相关内容,希望同学们能喜欢,同学们一起来学习一下吧!

1. 核心依赖

<dependencies>    <!--netty的依赖集合,都整合在一个依赖里面了-->    <dependency>        <groupId>io.netty</groupId>        <artifactId>netty-all</artifactId>        <version>4.1.6.Final</version>    </dependency></dependencies>
2. 代码2.1 启动项
public class NioWebSocketServer {    private final Logger logger=Logger.getLogger(this.getClass());    private void init(){        logger.info("正在启动websocket服务器");        NioEventLoopGroup boss=new NioEventLoopGroup();        NioEventLoopGroup work=new NioEventLoopGroup();        try {            ServerBootstrap bootstrap=new ServerBootstrap();            bootstrap.group(boss,work);            bootstrap.channel(NioServerSocketChannel.class);            bootstrap.childHandler(new NioWebSocketChannelInitializer());            Channel channel = bootstrap.bind(8081).sync().channel();            logger.info("webSocket服务器启动成功:"+channel);            channel.closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();            logger.info("运行出错:"+e);        }finally {            boss.shutdownGracefully();            work.shutdownGracefully();            logger.info("websocket服务器已关闭");        }    }    public static void main(String[] args) {        new NioWebSocketServer().init();    }}

netty搭建的服务器基本上都是差不多的写法:

绑定主线程组和工作线程组,这部分对应架构图中的事件循环组只有服务器才需要绑定端口,客户端是绑定一个地址配置channel(数据通道)参数,重点就是ChannelInitializer的配置以异步的方式启动,最后是结束关闭两个线程组2.2 ChannelInitializer写法

public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel ch) {        ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程        ch.pipeline().addLast("http-codec",new HttpServerCodec());//设置解码器        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));//聚合器,使用websocket会用到        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());//用于大数据的分区传输        ch.pipeline().addLast("handler",new NioWebSocketHandler());//自定义的业务handler    }}
2.3 自定义的处理器NioWebSocketHandler
public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> {    private final Logger logger=Logger.getLogger(this.getClass());    private WebSocketServerHandshaker handshaker;    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        logger.debug("收到消息:"+msg);        if (msg instanceof FullHttpRequest){            //以http请求形式接入,但是走的是websocket                handleHttpRequest(ctx, (FullHttpRequest) msg);        }else if (msg instanceof  WebSocketFrame){            //处理websocket客户端的消息            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //添加连接        logger.debug("客户端加入连接:"+ctx.channel());        ChannelSupervise.addChannel(ctx.channel());    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        //断开连接        logger.debug("客户端断开连接:"+ctx.channel());        ChannelSupervise.removeChannel(ctx.channel());    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){        // 判断是否关闭链路的指令        if (frame instanceof CloseWebSocketFrame) {            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());            return;        }        // 判断是否ping消息        if (frame instanceof PingWebSocketFrame) {            ctx.channel().write(                    new PongWebSocketFrame(frame.content().retain()));            return;        }        // 本例程仅支持文本消息,不支持二进制消息        if (!(frame instanceof TextWebSocketFrame)) {            logger.debug("本例程仅支持文本消息,不支持二进制消息");            throw new UnsupportedOperationException(String.format(                    "%s frame types not supported", frame.getClass().getName()));        }        // 返回应答消息        String request = ((TextWebSocketFrame) frame).text();        logger.debug("服务端收到:" + request);        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()                + ctx.channel().id() + ":" + request);        // 群发        ChannelSupervise.send2All(tws);        // 返回【谁发的发给谁】        // ctx.channel().writeAndFlush(tws);    }    /**     * 唯一的一次http请求,用于创建websocket     * */    private void handleHttpRequest(ChannelHandlerContext ctx,                                   FullHttpRequest req) {        //要求Upgrade为websocket,过滤掉get/Post        if (!req.decoderResult().isSuccess()                || (!"websocket".equals(req.headers().get("Upgrade")))) {            //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));            return;        }        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(                "ws://localhost:8081/websocket", null, false);        handshaker = wsFactory.newHandshaker(req);        if (handshaker == null) {            WebSocketServerHandshakerFactory                    .sendUnsupportedVersionResponse(ctx.channel());        } else {            handshaker.handshake(ctx.channel(), req);        }    }    /**     * 拒绝不合法的请求,并返回错误信息     * */    private static void sendHttpResponse(ChannelHandlerContext ctx,                                         FullHttpRequest req, DefaultFullHttpResponse res) {        // 返回应答给客户端        if (res.status().code() != 200) {            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),                    CharsetUtil.UTF_8);            res.content().writeBytes(buf);            buf.release();        }        ChannelFuture f = ctx.channel().writeAndFlush(res);        // 如果是非Keep-Alive,关闭连接        if (!isKeepAlive(req) || res.status().code() != 200) {            f.addListener(ChannelFutureListener.CLOSE);        }    }}

执行流程是:

web发起一次类似是http的请求,并在channelRead0方法中进行处理,并通过instanceof去判断帧对象是FullHttpRequest还是WebSocketFrame,建立连接是时候会是FullHttpRequest在handleHttpRequest方法中去创建websocket,首先是判断Upgrade是不是websocket协议,若不是则通过sendHttpResponse将错误信息返回给客户端,紧接着通过WebSocketServerHandshakerFactory创建socket对象并通过handshaker握手创建连接在连接创建好后的所以消息流动都是以WebSocketFrame来体现在handlerWebSocketFrame去处理消息,也可能是客户端发起的关闭指令,ping指令等等2.4 保存客户端的信息

当有客户端连接时候会被channelActive监听到,当断开时会被channelInactive监听到,一般在这两个方法中去保存/移除客户端的通道信息,而通道信息保存在ChannelSupervise中:

public class ChannelSupervise {    private   static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    private  static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap();    public  static void addChannel(Channel channel){        GlobalGroup.add(channel);        ChannelMap.put(channel.id().asShortText(),channel.id());    }    public static void removeChannel(Channel channel){        GlobalGroup.remove(channel);        ChannelMap.remove(channel.id().asShortText());    }    public static  Channel findChannel(String id){        return GlobalGroup.find(ChannelMap.get(id));    }    public static void send2All(TextWebSocketFrame tws){        GlobalGroup.writeAndFlush(tws);    }}

ChannelGroup是netty提供用于管理web于服务器建立的通道channel的,其本质是一个高度封装的set集合,在服务器广播消息时,可以直接通过它的writeAndFlush将消息发送给集合中的所有通道中去。但在查找某一个客户端的通道时候比较坑爹,必须通过channelId对象去查找,而channelId不能人为创建,所有必须通过map将channelId的字符串和channel保存起来。

结尾

感谢看到最后的朋友,都看到最后了,点个赞再走啊,如有不对之处还请多多指正。

标签: #nettysocketio和websocket #websocket断开原因boos