前言:
眼前大家对“java如何实现实时消息推送”都比较关注,姐妹们都想要学习一些“java如何实现实时消息推送”的相关内容。那么小编同时在网摘上搜集了一些有关“java如何实现实时消息推送””的相关文章,希望姐妹们能喜欢,我们快快来学习一下吧!结合实际的场景来把netty这个框架运行起来,一起去梳理这个过程,里面用到了nio和Reactor,nio实现了对应的API,但是它没有对多线程进行结合,大牛才设计出来reactor这个模式,来实现高性能的nio的编程,经过梳理才到了netty,reactor一定要搞懂。推送系统先别管是什么推送系统,先理解成一个客户端和服务端的一个程序,也先别管具体的业务场景,功能的属性比较弱,推送系统本身就是比较简单一个推送系统,里面也没有增删查改复杂。默认大家都理解长连接和短连接,网络请求的基本概念。
(一)设计和思路
① 介绍
客户端可能通过自定义的协议,或者是app应用,需要跟推送服务器建立一个连接,推和拉的区别是推是服务器主动像客户端发起请求,往往这个技术很难实现的,主动推数据需要建立一条网络通道,服务器才可以完成推送,不让它也不知道是那个客户端,一定是客户端主动和推送服务器建立了连接socket,一般的情况是通过拉的模式来完成推送,涉及到一些socket的技术点。归根到底就是数据交互,TCP连接的方式,客户端和服务端时间的交互。一个客户端跟推送服务器连接,10个,100个,1000个,百万个连接怎么办?不管程序如何优化始终是需要有上限的。有上限肯定是多台,推送系统是多台。网络请求如何形成集群呢?
推送API对外开放的,个推,极光,飞鸽,移动互联网应用开发,对外提供一套API,统一的。用户注册发短信,与农户注册发通知,订单中心,聊天系统。业务系统都是统一的。消息不可能直接达到推送系统,中间需要存在一个消息队列,消息队列进行存储,用到一个中间件(消息中间件或者数据库)推送系统和消息队列产生信息的交互,在将对应的消息推送给指定的client。如果是个网络连接就意味着,当前这个用户只会给一个系统建立连接,目前推送系统只有5台。推送系统是集群的。所以clients要跟推送系统中间添加一个负载均衡,这种中心化的nginx不是需要考虑的,正常的系统是中间添加一个push-server-dispatch 对用户接口(分派接口的),根据请求返回消息推送服务器的地址。
② 客户端和推送系统之前的push-server-dispatch
push-server-dispatch 就类似网络的DNS的服务器,咱们平常登录网页的时候填写的是域名,通过DNS告诉我们域名所在的IP,直接访问对应的IP地址。
推送服务注册更新在push-server-dispatchuserId发送请求给push-server-dispatch推送系统返回一个服务地址client 跟推送服务建立连接(中间不会存在nginx来进行通信的,如果中间添加了负载均衡,还需要在通过负载均衡确定某一个推送服务,量大的话都无法使用,只能进行1VS1的)肯定有老铁说zookeeper,这里先不说zookeeper的事情。③ 简单的数据包的扭转
SocketServer
import java.io.IOException;import java.io.InputStream;import java.net.ServerSocket;import java.net.Socket;public class SocketServer { public static void main(String[] args) throws IOException, Exception { // server ServerSocket serverSocket = new ServerSocket(9999); // 获取新连接 while (true) { final Socket accept = serverSocket.accept(); // accept.getOutputStream().write("推送实例"); InputStream inputStream = accept.getInputStream(); while (true) { byte[] request = new byte[220]; int read = inputStream.read(request); if (read == -1) { break; } // 得到请求内容,解析,得到发送对象和发送内容 String content = new String(request); if(content.getBytes().length > 220) { // TODO } else if(content.getBytes().length < 220) { } // 每次读取到的数据,不能够保证是一条完整的信息 System.out.println(content); } } }}
SocketClient
import java.io.IOException;import java.io.OutputStream;import java.net.Socket;import java.util.concurrent.CountDownLatch;public class SocketClient { public static void main(String[] args) throws Exception { Socket socket = new Socket("localhost", 9999); OutputStream outputStream = socket.getOutputStream(); // 消息长度固定为 220字节,包含有 // 1. 目标用户ID长度为10, 10 000 000 000 ~ 19 999 999 999 // 2. 消息内容字符串长度最多70。 按一个汉字3字节,内容的最大长度为210字节 byte[] request = new byte[220]; byte[] userId = "10000000000".getBytes(); byte[] content = "加油!!学习netty推送服务!学习netty推送服务!学习netty推送服务!".getBytes(); System.arraycopy(userId, 0, request, 0, 10); System.arraycopy(content, 0, request, 10, content.length); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { new Thread(() -> { try { outputStream.write(request); } catch (IOException e) { e.printStackTrace(); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); Thread.sleep(2000L); // 两秒后退出 socket.close(); }}④ 连接的方式
1.短连接
请求、响应之后,关闭已经建立的TCP连接,下次请求再建立一个连接(浏览器)。里面有keeplive保持连接的特性。
2.长连接
请求、响应之后,不关闭TCP连接,多次请求,复用同一个连接。存在一个问题,数据链都是在一个通道里面,你的也好,我的也好,都在一个通道,请求过来响应过去,不管请求和响应都是数据包在流转,数据包流转。
为了避免频繁创建连接/释放连接带来的性能损耗,以及消息获取的实时性,采用长连接的形式。
交互中存在的问题
发送一条消息(12345),在发送一条消息(66666),犹豫网络卡顿了,或者是发送卡顿了,或者一些不明原因,接收到的消息是123 66666 45 ,服务端接收到的消息是1条,而不是发送了2次的2条消息,发送2条消息,理论上是2条数据,但是在实际的传输过程中,变成了1条数据。
⑤ 发送流程
当发送方发送数据的时候,操作兄底层,并不是直接通过网线就直接出去了,操作系统有个发送的缓冲区,接收方有个也有个缓冲区,接收方从缓存中读取数据。这里面就会涉及到一个粘包和拆包的问题。
1.粘包《发送方》
操作系统接收到发送缓冲区,可能会判断目前的数据太小了,等一下发,等第二个进入缓冲区的时候的才完成下一步的发送,这里面有个算法的【Nagle】,算法就是为了做这一件事,就是为了提高网络效率,而去做的事情,举个例子:一个人去坐车可能车不开,非要达到多少人了车才开。这是为了提高网络的性能。这就是沾在一起的情况。
2.拆包《发送方》
一下发送了,5,6条数据过来,数据量太大了,太多了一下发不完,发不了。5条数据拿出来一半来发,把这一半发过去,剩下一半,每个链接有自己专属的缓冲区,不会存在冲突。
3.粘包《接收方》
接收到进入接收缓冲区,没有立刻处理accapt了,马上下个也来了,缓存冲里面的数据有堆积,读的时候发现缓冲区里面的数据怎么这么多了。一个一个读,发现好几个沾在一起。
4.拆包《接收方》
最后读数据发现读出来了很多空的。因为数据被拆了。
上边这块就是网络编程肯定会出现这种情况,怎么样解决呢。消息发送是有协议的,比如Http协议,可以通过协议本身发现数据是否完整,比方说,数据被拆分了,被沾在一起了,这些都是可以通过数据的内容发现出来的。每次都判断接收到的是否满足数组的长度,
(二)netty如何解决上边的粘包拆包的问题
① 示例展示
XNettyServer
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LineBasedFrameDecoder;public class XNettyServer { public static void main(String[] args) throws Exception { // 1、 线程定义 // accept 处理连接的线程池 EventLoopGroup acceptGroup = new NioEventLoopGroup(); // read io 处理数据的线程池 EventLoopGroup readGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptGroup, readGroup); // 2、 选择TCP协议,NIO的实现方式 b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 3、 职责链定义(请求收到后怎么处理) ChannelPipeline pipeline = ch.pipeline(); // TODO 3.1 增加解码器 // pipeline.addLast(new XDecoder()); // TODO 3.2 打印出内容 handdler pipeline.addLast(new XHandller()); } }); // 4、 绑定端口 System.out.println("启动成功,端口 9999"); b.bind(9999).sync().channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); readGroup.shutdownGracefully(); } }}
XHandller
import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 后续处理handdler */@ChannelHandler.Sharablepublic class XHandller extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 输出 bytebuf ByteBuf buf = (ByteBuf) msg; byte[] content = new byte[buf.readableBytes()]; buf.readBytes(content); System.out.println(Thread.currentThread()+ ": 最终打印"+new String(content)); ((ByteBuf) msg).release(); // 引用计数减一 // ctx.fireChannelRead(); } // 异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
XDecoder
import java.util.List;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;// 编解码一定是根据协议~public class XDecoder extends ByteToMessageDecoder { static final int PACKET_SIZE = 220; // 用来临时保留没有处理过的请求报文 ByteBuf tempMsg = Unpooled.buffer(); // in输入 --- 处理 --- out 输出 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes()); // in 请求的数据 // out 将粘在一起的报文拆分后的结果保留起来 // 1、 合并报文 ByteBuf message = null; int tmpMsgSize = tempMsg.readableBytes(); // 如果暂存有上一次余下的请求报文,则合并 if (tmpMsgSize > 0) { message = Unpooled.buffer(); message.writeBytes(tempMsg); message.writeBytes(in); System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes()); } else { message = in; } // 2、 拆分报文 // 这个场景下,一个请求固定长度为3,可以根据长度来拆分 // i+1 i+1 i+1 i+1 i+1 // 不固定长度,需要应用层协议来约定 如何计算长度 // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并 // dubbo rpc协议 = header(16) + body(不固定) // header最后四个字节来标识body // 长度 = 16 + body长度 // 0xda, 0xbb 魔数 int size = message.readableBytes(); int counter = size / PACKET_SIZE; for (int i = 0; i < counter; i++) { byte[] request = new byte[PACKET_SIZE]; // 每次从总的消息中读取220个字节的数据 message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理 out.add(Unpooled.copiedBuffer(request)); } // 3、多余的报文存起来 // 第一个报文: i+ 暂存 // 第二个报文: 1 与第一次 size = message.readableBytes(); if (size != 0) { System.out.println("多余的数据长度:" + size); // 剩下来的数据放到tempMsg暂存 tempMsg.clear(); tempMsg.writeBytes(message.readBytes(size)); } }}② 流程梳理netty读取数据触发,2个请求数据被合并了数据解析(编解码),数据规范,http或者自己写的都是可以解析出来,解析过将合并变成一个一个的请求。请求拆分,根据协议,编解码都是自己来写的。decode将输入的数据进行处理,在输出到后面的环节。存在一个等待的过程长度不够,先保存下来。够了在处理。交给Xhandler来进行处理。
自研过于复杂,采用合适的开源协议(XMPP/MQTT/WebSocket)
PS:下节一起完成下代码编写,websocket完成的推送代码实现。
标签: #java如何实现实时消息推送 #java消息推送服务