前言:
此刻同学们对“rocketmq常用命令”大致比较看重,小伙伴们都想要了解一些“rocketmq常用命令”的相关知识。那么小编同时在网摘上网罗了一些有关“rocketmq常用命令””的相关内容,希望大家能喜欢,我们一起来了解一下吧!在上一篇文章中,我们学习了RocketMQ的原理,以及RocketMQ中 命名服务 ServiceName 的运行流程,本篇从消息的生产、消费来理解一条消息的生命周期。
1 消息生产
在RocketMQ中,消息生产指的是 消息生产者往消息队列中写入数据的过程。因为业务场景的复杂性,RocketMQ架构设计了多种不同的写入策略。下面先讨论几种常见的场景:
同步发送:整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。
-异步发送:调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。OneWay(单向)发送:只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。延迟发送:指定延迟的时间,在延迟时间到达之后再进行消息的发送。批量发送:对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
以下是生产者实例化启动的过程:
1.1 消息发送步骤
一般情况下,我们发送消息,会使用默认的DefaultMQProducer类,经过以下几个步骤实现:
创建消息生产者Producer,并设置Producer的GroupName(生产组)。设置InstanceName(实例名称),当你的业务需要启用多个Producer的时候,使用不同的InstanceName来区分。设置NameServer地址,这样Producer才能从NameServer中得到路由信息完成其他的初始化配置,比如配置异常重试次数(降低消息丢失的可能性),通信模块初始化等。组装消息对象,指定主题Topic、Tag和消息体Message 等信息。通过NameServer获取到的Broker路由地址,将消息发送。1.2 消息发生返回状态
消息发送之后,会相应的拿到回执。返回对象中的状态(SendResult.SendStatus)有4种,如下:
FLUSH_DISK_TIMEOUT 刷盘超时
如果将Broker的刷盘策略设置成SYNC_FLUSH,那么没有在规定的时间完成刷盘则会报该错误。FLUSH_SLAVE_TIMEOUT 主从同步超时
主从模式下(也可以叫主备),Broker配置为SYNC_MASTER模式,如果没有在设定时间内完成主从同步,则会报该错误。SLAVE_NOT_AVAILABLE 未找到Slave Broker
主从模式下,且Broker配置为SYNC_MASTER,如果未找到Slave的Broker,则会报该错误。SEND_OK
表示发送成功。1.3 发送同步消息
实时同步消息是一种对可靠性、实时性要求比较高的场景,使用的也比较广泛,比如:
重要的消息通知,比如验证码,不能超过太长时间推送,那样可能失效消费记录确认数据实时处理和推送 等等
public class SyncProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testSyncGroup DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","sync", "测试同步消息".getBytes("UTF-8")); // 5、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 6、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}1.4 发送异步消息
我们知道,异步主要用于那些对实时响应不敏感的业务,可以容忍一定时间的等待,只要能达到最终一致性即可。
有时候为了在流量高峰期进行削峰和分流,缓解压力,我们经常采用异步消息的发送模式。这种业务场景也很常见,比如:
消费信息的推送,可能在你买单之后的几分钟才送达数据统计、文件打包下载等需要长耗时的任务
public class AsyncProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testAsyncGroup DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","async", "测试异步消息".getBytes("UTF-8")); // 5、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 6. 发送异步消息,SendCallback是处理异步回调的方法 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 成功回调 System.out.println("success: " + sendResult); } @Override public void onException(Throwable throwable) { // 失败回调 System.out.println("fail: " + throwable); } }); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}1.5 单向发送消息
OneWay的模式主要用在Care发送结果的场景,只要消息发送出去即完成任务,不需要对发送的状态、结果负责。常见的使用场景如
普通日志记录非核心的埋点上报等
public class OneWayProducerApplication { public static void main(String[] args) throws Exception { // 1、创建生产者producer,并指定生产者组名为 testOneWayGroup DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","oneway", "测试单向发送消息".getBytes("UTF-8")); // 5、发送消息到一个Broker producer.sendOneway(msg); // 6、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}1.6 发送延时消息
指定延迟的时间,在延迟时间到达之后再进行消息的发送。这种的使用场景也很多:
比如火车票订购,提交了一个订单就把车票给占位了,这时候可以发送一个延时确认的消息,15m 未付款,就要把该车票释放,让其他人去购买。还比如购买了电影票,可以发送一个核销信息,在电影开场前15分钟就无法退票了。1.6.1 延时时间的使用限制
延时时间并不是随意指定的,Rocket源码中指定了18种等级,分别代表不同的时间时长,如下:
// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";RocketMq不支持任意时间延时,需设置固定的延时等级,从1s到2h分别对应着等级1到18可以使用setDelayTimeLevel(int level) 方法设置延时等级,level 从 0 开始1.6.2 发送延时消息具体实现
通过下面的代码,可以得到的结果是消费的时间点比信息记录的时间点延迟了1分钟,这是因为我们在send的时候做了delay。
public class DelayProducerApplication { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException { // 1、创建生产者producer,并指定生产者组名为 testDelayGroup DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息,并指定Topic,Tag和消息体 Message msg = new Message("testTopic","delay", "测试延迟发送消息".getBytes("UTF-8")); // 5、设置延时等级4,对应1m,所以这个消息在一分钟之后发送 msg.setDelayTimeLevel(4); // 6、发送消息到一个Broker SendResult sendResult = producer.send(msg); // 7、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 8、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}1.7 发送批量消息对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。
waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。
一批次的消息总大小不应超过4MB。
public class BatchProducerApplication { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException { // 1、创建生产者producer,并指定生产者组名为 testBatchGroup DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup"); // 2、指定NameServer的地址,以获取Broker路由地址 producer.setNamesrvAddr("192.168.139.1:9876"); // 3、启动producer producer.start(); // 4、创建消息列表,并指定Topic,Tag和消息体 List<Message> messages = new ArrayList<>(); String topic = "testTopic"; messages.add(new Message(topic, "batch", "测试批量发送消息 0".getBytes("UTF-8"))); messages.add(new Message(topic, "batch", "测试批量发送消息 1".getBytes("UTF-8"))); messages.add(new Message(topic, "batch", "测试批量发送消息 2".getBytes("UTF-8"))); // 5、发送消息到一个Broker SendResult sendResult = producer.send(messages); // 6、通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 7、如果不再发送消息,关闭生产者Producer producer.shutdown(); }}1.8 如何提升消息生产的性能
消息的发送一般是经过 client发送、Broker服务器接收并处理、Broker服务器返回应答 三个步骤。
如果我们想要提高消息生产的效率,一般有如下方法:
Oneway方式发送
Oneway方式发送用在一些性能要求高,可靠性要求低的场景下,比如日志采集,非核心的埋点上报等。Oneway方式发送请求无需应答,即将数据写入客户端的Socket缓冲区就返回,不等待结果的返回。
所以这种模式是极快的,可以把发送消息时长缩短至微秒级。增加Producer的并发量,使用多个Producer同时发送
RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续数据写入文件系统。
顺序执行CommitLog让RocketMQ可以保持较高的写入性能。恰当的批量发送
对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。
对于消息体的大小也要注意不能超过4MB。
根据阿里内部调优后的性能测试报告,消息的写入性能达到90万+的TPS,我们可以朝着这个指标进行优化。
2 总结
本篇介绍了RocketMQ 消息生产与发送的几种模式:
同步发送:整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。异步发送:调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。OneWay(单向)发送:只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。延迟发送:指定延迟的时间,在延迟时间到达之后再进行消息的发送。批量发送:对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
可以根据实际的业务场景选择适当的发送模式。
为帮助开发者们提升面试技能、有机会入职BATJ等大厂公司,特别制作了这个专辑——这一次整体放出。
大致内容包括了: Java 集合、JVM、多线程、并发编程、设计模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大厂面试题等、等技术栈!
欢迎大家关注公众号【Java烂猪皮】,回复【666】,获取以上最新Java后端架构VIP学习资料以及视频学习教程,然后一起学习,一文在手,面试我有。
每一个专栏都是大家非常关心,和非常有价值的话题,如果我的文章对你有所帮助,还请帮忙点赞、好评、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!
标签: #rocketmq常用命令