龙空技术网

RocketMQ消费端Push和Pull两种消费方式:推模式开发示例

架构笔记 104

前言:

如今大家对“push方法”大约比较珍视,看官们都想要了解一些“push方法”的相关内容。那么小编同时在网络上网罗了一些关于“push方法””的相关文章,希望各位老铁们能喜欢,姐妹们快快来了解一下吧!

在RocketMQ中消息消费者分为两种:PUSH 和 PULL,大多数场景使用的是 PUSH 模式。这两种模式分别对应的是 DefaultMQPushConsumer 类和 DefaultLitePullConsumer(DefaultMQPullConsumer 已废弃)类。

PUSH 模式实际上在内部还是使用的 PULL 方式实现的,通过 PULL 不断地轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起 PULL 请求(5秒后再check),直到有新消息产生才取消挂起,返回新消息。

在 PUSH 方式中是通过“长轮询”方式达到 PUSH 效果,长轮询方式既有 PULL 的优点,又兼具 PUSH 方式的实时性。

PUSH与PULL 消费模式比较:

PUSH模式可控性不高,几乎近实时的拉取消息,拉取到消息马上推送监听器进行业务处理。PULL模式可控性高,可以自主决定何时提交offset,何时进行消息拉取以及可等到大批量消息时再处理。

一、消费客户端DefaultMQPushConsumer简介

DefaultMQPushConsumer 客户端的区别在于,它注册了一个 MessageListener(消息监听器)。

RocketMQ暴露给开发者使用的基于Push模式的默认生产者类,和DefaultMQProducer一样,它也仅仅是一个外观类,基本没有业务逻辑,几乎所有操作都转交给生产者实现类DefaultMQPushConsumerImpl完成。这么做的好处是RocketMQ屏蔽了内部实现,方便在后续的版本中随时更换实现类,而用户无感知。

客户端拉取到消息之后,会提交消费请求ConsumeRequest,交给线程池(ConsumeMessageService)去负责调度。RocketMQ提供了两个实现类,分别是ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,前者用来并发消费消息,后者用来消费有序消息。

在消息拉取时,会创建PullCallback对象,消息拉取完成后会触发对应的回调,在回调方法里如果拉取到了消息会触发消息消费流程。

如果消费成功,会记录消费位点,然后定时上报给Broker。如果消费失败,会调用sendMessageBack方法将消息发回给Broker,不过不是发送到原来的队列,而是统一发送到重试队列里,等待二次投递。

二、DefaultMQPushConsumer 开发实践

在示例项目中创建DemoPushConsumer 消费类,如图所示:

DemoPushConsumer 消费类的代码如下所示:

public class DemoPushConsumer { public static void main(String[] args) throws MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(AppConstant.PUSH_CONSUMER_GROUP); // 设置NameServer的地址 consumer.setNamesrvAddr(AppConstant.NAME_SERVER); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 // 订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。 consumer.subscribe(AppConstant.PULLPUSH_TOPIC, "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); }}

消息发送者仍然沿用之前创建的MessageProducer 消费生产者。

三、启动应用

启动应用,通过RocketMQ Dashboard可视化界面可查看与Consumer Group 连接的客户端列表,如图所示。

Push 推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现,所以 Push 称为被动消费类型(Consume Passively)。

单击操作列中的【终端】查看详情,可查看消费者详情,如图所示:

点击【消息】菜单项,可以查看指定Topic 中的消息列表,点击【消息详情】可以查看消息详细内容,如图所示:

标签: #push方法