前言:
眼前你们对“rocketmq发送消息的类型”大约比较着重,朋友们都想要剖析一些“rocketmq发送消息的类型”的相关内容。那么小编同时在网上汇集了一些有关“rocketmq发送消息的类型””的相关内容,希望大家能喜欢,兄弟们快快来学习一下吧!Apache RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。
RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
本文将介绍Apache RocketMQ 中的消息类型。
一、按照发送消息的方式分类
按照发送消息的方式分类:
同步消息异步消息单向消息
(1)同步消息
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
(2)异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。
应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
(3)单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
二、 按照消息功能特点分类
按照消息功能特点分类:
普通消息顺序消息延时消息和定时消息事务消息
(1)普通消息
也称并发消息,并发消息没有顺序,生产和消息都是并行进行的,单机性能可达到10万级别的TPS。
普通消息是指消息队列RocketMQ中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。
(2)顺序消息
顺序消息分为两类:全局顺序和分区顺序。
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
在RocketMQ 中,分别提供了顺序同步、顺序异步、顺序单向的方式。
(3)延时消息和定时消息
延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
要注意的是延时的时间是需要按照默认配置的延时级别去配置的,而不是随意设置消息的延时时间。
默认 Broker服务器端有18个定时级别,每一个级别分别对应不同的延迟时间。
默认的level定义:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个配置下标从1开始 比如级别2是延时5秒、级别5是延时1分钟。
和普通的消息不同之处在于Producer在发送消息的时候 需要设置message.setDelayTimeLevel();延迟级别方法。其他参数和消费端的写法并无不同之处。
示例代码如下所示:
//延时的级别为3 对应的时间为10s 就是发送后延时10S在把消息投递出去message.setDelayTimeLevel(3);
延迟队列的实现思路:
producer发出消息broker在准备将消息写入存储的时候,延时消息会更改Message的topic为延时消息队列的topic,也就是将消息投递到延时消息队列。不同延迟级别、对应不同队列。定时线程不断读取队列,延迟时间到了,就转换为普通的消息,存到真实的topic下。此时consumer才能看到并消费该消息
定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
定时消息设置的是绝对时间:setStartDeliverTime()
其实定时消息实现原理比较简单,如果一个topic对应的消息在发送端被设置为定时消息,那么会将该消息先存放在topic为SCHEDULE_TOPIC_XXXX的消息队列中,并将原始消息的信息存放在commitLog文件中,由于topic为SCHEDULE_TOPIC_XXXX,所以该消息不会被立即消息,然后通过定时扫描的方式,将到达延迟时间的消息,转换为正确的消息,发送到相应的队列进行消费。
定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
定时消息和延时消息的使用在代码编写上存在略微的区别:
发送定时消息需要明确指定消息发送时间点之后的某一时间点作为消息投递的时间点。
发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
(4)事务消息
通过发送 Half 消息、处理本地事务、提交消息或者回滚消息,优雅地实现分布式事务。
事务消息:消息队列RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列RocketMQ事务消息能达到分布式事务的最终一致。
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势:
消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
事务消息交互流程如下图所示:
事务消息发送步骤如下:
发送方将半事务消息发送至消息队列RocketMQ服务端。消息队列RocketMQ服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。发送方开始执行本地事务逻辑。发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
三、RocketMQ 消费模式
RocketMQ的消费模式有2种:
在默认情况下,就是集群消费(CLUSTERING);另一种消费模式,是广播消费(BROADCASTING)。
(1)集群模式
同一个Consumer Group下的多个consumer平摊消息队列中的消息,例如三个消费者处于同一个group下,且订阅了同一个topic,加入生产者往消息队列中放入了这个topic的6条消息,那么消费者消费消息的总和为6条,消费完的消息不能被其他实例所消费。默认是集群模式。
(2)广播模式
指的是consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中Consumer Group概念可以认为在消息划分方面无意义。
需要设置消息消费模式为广播模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
标签: #rocketmq发送消息的类型