前言:
此时各位老铁们对“java模拟qq聊天”大体比较注意,我们都想要学习一些“java模拟qq聊天”的相关知识。那么小编在网上网罗了一些有关“java模拟qq聊天””的相关内容,希望大家能喜欢,小伙伴们一起来了解一下吧!我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有2个主要原因:
Netty封装了复杂的JDK 的 NIO操作,还封装了各种复杂的异常场景,丰富的API使得在使用上也非常方便,几行代码就可以实现高性能的网络通信功能。Netty已经经历各种大型中间件的生产环境的验证,高可用性和健壮性都得到了全方位验证,用起来更放心。
本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为2个部分:Netty的核心概念、IM聊天简易实现。
一、Netty核心概念1、通信流程
既然是网络通信,那肯定有服务端和客户端。在客户端-A和客户端-B通信的过程中,实际上是利用服务端作为消息中转站,来实现A-B通信的。
不管是点-点通信,还是群通信,都可以认为是客户端-服务端之间的通信,有了这一点,许多设计方案都可以轻松理解。
2、服务端核心概念Boss线程
Boss线程负责监听端口,接受新的连接,监听连接的数据读写变化。
Worker线程
Worker线程负责处理具体的业务逻辑,Boss线程接收到连接的读写变化后,然后交给Worker处理具体业务逻辑。
服务端的IO模型
Netty支持使用NIO和BIO进行通信,可以自行设置。一般使用NioServerSocketChannel来指定NIO模型。
服务端引导类
服务端通过引导类 ServerBootstrap来启动一系列的工作。
3、客户端核心概念Worker线程
客户端只有工作线程的概念,负责连接到服务端,监听数据读写变化。
客户端的IO模型
一般使用NioSocketChannel指定客户端的NIO模型
客户端引导类
客户端通过引导类Bootstrap来启动一些列工作。
4、通用核心概念Handler
负责处理接受到的消息,大部分的业务逻辑都是放在Handler里处理。自定义的Handler一般继承于SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。
ByteBuf和编码、解码
数据的载体,Java对象编码成字节码,存放于ByteBuf,然后发送出去。服务端接收到消息后,从ByteBuf中取出数据,解码成Java对象。
通讯协议
许多框架都会自定义一套自己的协议,这样比较符合业务。比如dubbo协议、hessian协议。
一般的协议包括如下部分:魔数、版本号、序列化算法、指令、数据长度、数据内容,其余的都是为了适配自身业务而定的。
魔数:一般是固定数字,用来快速判断是否符合本协议,如果不符合本协议,则快速失败。版本号:一般无需改动,如果早期设置的协议到了后续不适用了,在升级版本号。序列化算法:Java对象转序列化的方式,比如JSON。指令:操作大类。比如说登录指令、单点发送消息指令、建群指令等。这样服务端接收到对应指令就用对应的Handler去处理业务逻辑。指令占用的字节数可以根据自身业务适当调大。数据长度:用来记录本次数据的长度。数据内容:具体消息内容,比如聊天时的消息、登录时的用户名密码等。粘包拆包
Netty属于上层应用,在发送消息时,还是通过底层操作系统将数据发送出去,操作系统在发送数据时,不会按照我们设想的消息长度去发送内容。这就需要我们在接收到内容时,自行做好内容的分割和等待。
比如有一条消息1024字节,如果接受的内容没这么长就需要继续等待,等这条消息的内容完整后,在处理。如果接受的内容包含了1条完整消息和1条不完整的消息,那么就需要拆分内容,将完整的消息先传递到后面处理,剩下不完整的消息则继续等待下一个内容。
Netty自带了几种拆包器:固定长度的拆包器 FixedLengthFrameDecoder、行拆包器 LineBasedFrameDecoder、分隔符拆包器 DelimiterBasedFrameDecoder、长度域拆包器LengthFieldBasedFrameDecoder。
一般在使用自定义协议时,会使用:长度域拆包器 LengthFieldBasedFrameDecoder。
空闲检测和定时心跳
在服务端和客户端的通信过程中,有时候会出现假死连接,或者长时间没有消息传递需要释放连接。对于这些连接,我们需要及时释放,毕竟每条连接都占用着CPU和内存资源。大量这种连接如果不及时释放,服务器资源迟早会耗尽,最终崩溃。
应对这种问题的解决方式是:Netty提供了IdleStateHandler做空闲检测,用来检测连接是否活跃,如果再指定的时间内,没有活跃,那么就关闭连接。然后就是客户端定时发送心跳请求,服务器响应心跳请求。
二、IM聊天简易实现
介绍完Netty的核心概念,接下来以一个简易的点对点IM聊天,将核心概念融入到案例中。IM聊天的核心模块大致是如下几个:
1、通信主体流程
通信主体流程就是搭建好:服务端、客户端、两端正常建立连接进行通信。
服务端代码:
public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("server accept: " + msg); } }); } }); serverBootstrap.bind(9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("端口9000绑定成功"); } else { System.err.println("端口9000绑定失败"); } });}
客户端代码:
public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); bootstrap.connect("127.0.0.1", 9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("链接服务端成功"); Channel channel = ((ChannelFuture) future).channel(); channel.writeAndFlush("我是客户端A"); } else { System.err.println("连接服务端失败"); } });}2、数据包—包含通讯协议
定义数据包的抽象类,后续的各种类型的数据包都继承此类。数据包中定义通讯协议的各种字段。
@Datapublic abstract class Packet { /** * 协议版本 */ private Byte version = 1; /** * 指令,此处有多种实现:比如登录、登出、单聊、建群等等 * * @return */ public abstract Byte getCommand(); /** * 获取算法,默认使用JSON,如果使用其余算法,子类重写此方法 * * @return */ public Byte getSerializeAlgorithm() { return SerializerAlgorithm.JSON; }}public class LoginRequestPacket extends Packet { private String userName; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; }}3、序列化器
定义序列化器,功能包括:序列化、反序列化。可以定义多种序列化算法,文中以JSON为例。
public interface Serializer { /** * 序列化算法 * * @return */ byte getSerializerAlgorithm(); /** * java 对象转换成二进制 */ byte[] serialize(Object object); /** * 二进制转换成 java 对象 */ <T> T deserialize(Class<T> clazz, byte[] bytes);}public class JSONSerializer implements Serializer { @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; } @Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); }}4、编解码器
有了通讯协议、有了序列化协议,接下来就是对数据的编码和解码了。
public void encode(ByteBuf byteBuf, Packet packet) { Serializer serializer = getSerializer(packet.getSerializeAlgorithm()); // 1. 序列化 java 对象 byte[] bytes = serializer.serialize(packet); // 2. 实际编码过程 byteBuf.writeInt(MAGIC_NUMBER); byteBuf.writeByte(packet.getVersion()); byteBuf.writeByte(packet.getSerializeAlgorithm()); byteBuf.writeByte(packet.getCommand()); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes);}public Packet decode(ByteBuf byteBuf) { // 跳过 magic number byteBuf.skipBytes(4); // 跳过版本号 byteBuf.skipBytes(1); // 读取序列化算法 byte serializeAlgorithm = byteBuf.readByte(); // 读取指令 byte command = byteBuf.readByte(); // 读取数据包长度 int length = byteBuf.readInt(); // 读取数据 byte[] bytes = new byte[length]; byteBuf.readBytes(bytes); Class<? extends Packet> requestType = getRequestType(command); Serializer serializer = getSerializer(serializeAlgorithm); if (requestType != null && serializer != null) { return serializer.deserialize(requestType, bytes); } return null;}5、消息处理器Handler
以上把通讯的基本架子和收发消息的数据包、协议、编解码器等基础工具已经做完,接下来就是编写Handler实现具体的业务逻辑了。
这里以客户端发起登录功能为例,分3步,消息收发也是类似:
先在客户端发送登录请求数据包。服务端接收到登录请求数据包后,在服务端的Handler里做业务逻辑处理,然后发送响应给客户端。客户端接收到登录响应数据包后,在客户端的Handler里做业务逻辑处理。
效果如下:
核心代码如下:
客户端发送请求
bootstrap.connect("127.0.0.1", 9000) .addListener(future -> { if (future.isSuccess()) { System.out.println("连接服务端成功"); Channel channel = ((ChannelFuture) future).channel(); // 连接之后,假设再这里发起各种操作指令,采用异步线程开始发送各种指令,发送数据用到的的channel是必不可少的 sendActionCommand(channel); } else { System.err.println("连接服务端失败"); } });private static void sendActionCommand(Channel channel) { // 直接采用控制台输入的方式,模拟操作指令 Scanner scanner = new Scanner(System.in); LoginActionCommand loginActionCommand = new LoginActionCommand(); new Thread(() -> { loginActionCommand.exec(scanner, channel); }).start(); }服务端接受请求,并且处理
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) { LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); loginResponsePacket.setVersion(loginRequestPacket.getVersion()); loginResponsePacket.setUserName(loginRequestPacket.getUserName()); if (valid(loginRequestPacket)) { loginResponsePacket.setSuccess(true); String userId = IDUtil.randomId(); loginResponsePacket.setUserId(userId); System.out.println("[" + loginRequestPacket.getUserName() + "]登录成功"); SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel()); } else { loginResponsePacket.setReason("校验失败"); loginResponsePacket.setSuccess(false); System.out.println("登录失败!"); } // 登录响应 ctx.writeAndFlush(loginResponsePacket);}private boolean valid(LoginRequestPacket loginRequestPacket) { System.out.println("服务端LoginRequestHandler,正在校验客户端登录请求"); return true;}客户端接受响应,并且处理
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) { String userId = loginResponsePacket.getUserId(); String userName = loginResponsePacket.getUserName(); if (loginResponsePacket.isSuccess()) { System.out.println("[" + userName + "]登录成功,userId为: " + loginResponsePacket.getUserId()); SessionUtil.bindSession(new Session(userId, userName), ctx.channel()); } else { System.out.println("[" + userName + "]登录失败,原因为:" + loginResponsePacket.getReason()); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("客户端连接被关闭!"); }}6、空闲检测和定时心跳
主流程和主要功能已经实现,还剩最后一个空闲检测和定时心跳。
实现步骤:
客户端和服务端都先定义好空闲检测。如果再规定的时间内没有数据传输,则关闭通道。客户端定时发送心跳服务端处理心跳请求,发送响应给客户端
核心代码:
空闲检测代码:
/** * IM聊天空闲检测器 * 比如:20秒内没有数据,则关闭通道 */public class ImIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 20; public ImIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!"); ctx.channel().close(); }}
客户端定时心跳代码:
public void channelActive(ChannelHandlerContext ctx) throws Exception { scheduleSendHeartBeat(ctx); super.channelActive(ctx); } private void scheduleSendHeartBeat(ChannelHandlerContext ctx) { // 此处无需使用scheduleAtFixedRate,因为如果通道失效后,就无需在发起心跳了,按照目前的方式是最好的:成功一次安排一次 ctx.executor().schedule(() -> { if (ctx.channel().isActive()) { System.out.println("定时任务发送心跳!"); ctx.writeAndFlush(new HeartBeatRequestPacket()); scheduleSendHeartBeat(ctx); } }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); }
服务端响应心跳代码:
public class ImIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 20; public ImIdleStateHandler() { super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!"); ctx.channel().close(); }}三、总结
本文介绍了Netty的核心概念,以及基本使用方法,希望能够帮到你。本文核心词:
通信流程Boss线程、Worker线程处理消息的Handler通讯协议、序列化协议、编解码器空闲检测、定时心跳
本文完整代码:
本篇完结!感谢你的阅读,欢迎点赞 关注 收藏 私信!!!
原文链接:Netty入门实践-模拟IM聊天 - 不焦躁的程序员、Netty入门实践-模拟IM聊天
标签: #java模拟qq聊天