龙空技术网

「Spring Boot 集成应用」RocketMQ的集成用法(上)

Mirson架构之道 304

前言:

而今看官们对“apache如何使用spring”可能比较看重,各位老铁们都需要剖析一些“apache如何使用spring”的相关知识。那么小编同时在网络上搜集了一些关于“apache如何使用spring””的相关内容,希望看官们能喜欢,兄弟们快快来了解一下吧!

1. RocketMQ集成介绍

在金融互联网领域广泛应用,在阿里双11活动经历过多次考验, 经过严苛的生产验证,有比较高的可靠性,在数据处理上有比较高的稳定性, 能从最大程度上保证消息不易丢失,如果业务上有一定的规模, 且对数据的一致性,稳定性要求严苛, 那么可以采用RocketMQ, 比如金融互联网领域, 支付场景、交易场景等。如果有借助消息队列实现分布式事务, RocketMQ可以作为首选。

Spring Boot 官方提供了spring-boot-starter-activemq 对ActiveMQ的支持, 但并没有提供对RocketMQ的支持, 这不代表Spring Boot 本身不支持, RocketMQ 官方给我们提供了RocketMQ-Spring 框架, 整合了RocketMQ与Spring Boot, 主要提供3个特性:

使用 RocketMQTemplate 用来统一发送消息,包括同步、异步发送消息和事务消息@RocketMQTransactionListener 注解用来处理事务消息的监听和回查@RocketMQMessageListener 注解用来消费消息2. RocketMQ安装说明

简要安装说明, 详情请参考官方文档:

1、下载RocketMQ 安装文件

采用其他镜像下载:

2、启动 NameServer

nohup sh bin/mqnamesrv &tail -f ~/logs/rocketmqlogs/namesrv.log

3、启动Broker

nohup sh bin/mqbroker -n localhost:9876 &tail -f ~/logs/rocketmqlogs/broker.log
3. RocketMQ集成配置

采用RocketMQ官方提供得rocketmq-spring-boot-starter作为集成组件。

1、创建spring-boot-mq-rocket父级工程

MAVEN依赖:

<properties>        <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version></properties><dependencies>    <!-- RocketMq与Spring Boot 集成组件依赖-->    <dependency>        <groupId>org.apache.rocketmq</groupId>        <artifactId>rocketmq-spring-boot-starter</artifactId>        <version>${rocketmq-spring-boot-starter-version}</version>    </dependency></dependencies>

2、创建rocketmq-basic工程

工程依赖直接继承父级依赖, 无须添加其他依赖组件。

工程配置:

application.yml文件:

server:  port: 12613spring:  application:    name: rocketmq-basic# RocketMQ配置rocketmq:  name-server: 10.10.20.15:9876  producer:    group: basic-group

配置填写RocketMQ地址信息, 如果是集群,多个以逗号分割。

3、创建启动类

com.mirson.spring.boot.mq.rocket.basic.startup.RocketMqBasicApplication

@SpringBootApplication@ComponentScan(basePackages = {"com.mirson"})public class RocketMqBasicApplication {    public static void main(String[] args) {        SpringApplication.run(RocketMqBasicApplication.class, args);    }}

扫描包含com.mirson包下所有路径。

4. RocketMQ集成之普通消息处理

1、定义监听器

com.mirson.spring.boot.mq.rocket.basic.consume.StringConsumer:

@Service@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC, consumerGroup = RabbitMqConfig.CONSUME_GROUP_STRING)@Log4j2public class StringConsumer implements RocketMQListener<String> {    @Override    public void onMessage(String message) {        log.info("StringConsumer => receive: " + message);    }}
订阅的主题为RabbitMqConfig.TOPIC, 订阅的分组为RabbitMqConfig.CONSUME_GROUP_STRING。实现RocketMQListener接口, 将接收的消息通过日志打印。

2、提供接口

com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

@RestController@Log4j2public class RocketMqProviderContorller {    @Resource    private RocketMQTemplate rocketMQTemplate;    /**     * 生产者发送字符类型消息     * @return     */    @GetMapping("/sendString")    public String sendString() {        String msg = "random number: " + RandomUtils.nextInt(0, 100);        // Send string        SendResult sendResult = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC, msg);        log.info("send result: " + sendResult.getSendStatus());        return msg;    }    ...}

提供sendString接口, 每次请求发送一个随机数, 通过RocketMQTemplate的syncSend同步方法发送数据。

如果发送成功, 会返回状态: SEND_OK。

3、调用验证

访问接口地址:查看监听器日志

可以看到, String类型的普通消息监听器, 正常接收到消息。

5. RocketMQ集成之原生消息处理

RocketMQ原生消息,除了发送的数据, 还可以获取RocketMQ内置的系统信息, 比如消息ID, 主机名称,时间戳, 队列信息等。

1、定义监听器

com.mirson.spring.boot.mq.rocket.basic.consume.MessageExtConsumer

@Service@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_EXT,  selectorExpression = "tag1", consumerGroup = RabbitMqConfig.CONSUME_GROUP_EXT)@Log4j2public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener{    @Override    public void onMessage(MessageExt message) {        log.info("MessageExtConsumer => receive msgId:{}, msgData:{} ", message.getMsgId(), new String(message.getBody()));    }    /**     * 自定义消费者的开始位置,这里设置的是当前时间     * @param consumer     */    @Override    public void prepareStart(DefaultMQPushConsumer consumer) {        // set consumer consume message from now        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));    }}
订阅的主题为RabbitMqConfig.TOPIC_EXT, 订阅的Group为RabbitMqConfig.CONSUME_GROUP_EXT。打印接收到的消息ID与数据。实现RocketMQPushConsumerLifecycleListener接口, 可以自定义消费者的开始消息位置, 这里设置的是当前时间。

2、提供发送接口

com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller, 增加接口:

/**     * 发送RocketMQ 原生消息     * @return     */@GetMapping("/sendStringExt")public String sendStringExt() {    String msg = "random number: " + RandomUtils.nextInt(0, 100);    try {        SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_EXT + ":tag1", msg);        log.info("result:  " + result.getSendStatus());    }catch(Exception e) {        log.error(e.getMessage(), e);    }    // Send String Ext Message    return msg;}
发送一个随机数, 增加了一个tag1标记, 与上面RocketMQMessageListener注解中的selectorExpression需保持一致, 如不匹配, 不能收到对应消息。与正常发送方式没有差异, 不需做额外处理, 仍采用同步方式发送。

3、测试验证

访问接口:

查看监听器日志6. RocketMQ集成之Spring Message消息

Spring Message 是一种消息传输规范, RocketMQ可以支持, 在Spring Cloud Stream 中采用的就是Spring Message作为消息传输规范, 这是一个用于构建基于消息的微服务应用框架。

1、定义传输对象

在实际消息交互当中, 不会传输简单的数据结构, 一般传递的是业务对象,这里定义一个订单对象:

com.mirson.spring.boot.mq.rocket.basic.bo.Order

@Datapublic class Order implements Serializable {    private static final long serialVersionUID = -1L;    /**     * 订单ID     */    private String orderId;    /**     * 创建时间     */    private Date createDate;}

消息交互当中, 默认会通过序列化传递, 需要实现序列化接口。

2、定义监听器

com.mirson.spring.boot.mq.rocket.basic.consume.OrderSpringMessageConsumer

@Service@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE, consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE)@Log4j2public class OrderSpringMessageConsumer implements RocketMQListener<Order> {    @Override    public void onMessage(Order order) {        log.info("OrderSpringMessageConsumer => receive order: " + order);    }}
定义不同的主题以区分, 这里订阅的主题为RabbitMqConfig.TOPIC_SPRING_MESSAGE, 组别为RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE。实现RocketMQListener接口, 泛型为Order; 打印接收到的订单数据。

3、定义发送接口

com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

/**     * 发送RocketMQ Spring Message封装消息     * @return     */@GetMapping("/sendSpringMessage")public String sendSpringMessage() {    String msg = "random number: " + RandomUtils.nextInt(0, 100);    Order order = new Order();    order.setOrderId(UUID.randomUUID().toString());    order.setCreateDate(new Date());    // Send Spring Message With Order    SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE, MessageBuilder.withPayload(order).build());    log.info("send result: " + result.getSendStatus());    return msg;}
创建一个订单对象, 生成UUID作为订单ID, 设置订单创建时间。采用同步方式发送, 指定主题RabbitMqConfig.TOPIC_SPRING_MESSAGE, 注意, Spring Message封装采用MessageBuilder, 将订单放入playload包体里面,调用build方法进行序列化。

4、测试验证

调用发送接口

查看监听器日志

能够正常接收并打印出完整的订单数据。

标签: #apache如何使用spring