龙空技术网

Springboot2.2.6构建RabbitMQ消息接收端

星空奇点 136

前言:

现在你们对“rabbitmq jar包”大体比较关怀,各位老铁们都想要剖析一些“rabbitmq jar包”的相关知识。那么小编在网上汇集了一些关于“rabbitmq jar包””的相关文章,希望你们能喜欢,我们快快来学习一下吧!

接续之前文章

小T说:消息中间件,构建RabbitMQ消息发布端代码

小T说:AMQP协议、模型及RabbitMQ常用组件

小T说:微服务,数据一致性与RabbitMQ

小T说:消息中间件,为什么用RabbitMQ及支持的场景

大家好,我是技术人小Top

今天咱们来介绍如何使用RabbitMQ构建消息发布端 ^-^

RabbitMQ官网:

上次介绍了Spring2.26如何构建RabbitMQ消息发送端

具体到应用开发,需要使用RabbitMQ API来实现具体业务场景

现在开始进入实战

消息接收端项目pom文件

依赖jar包

消息接收端代码结构

1、yml/properties配置

包含微服务、数据库连接池、Mybatis、RabbitMQ等自动配置信息

2、config包

此处不需要任何配置类:RabbitMQ的消息接收通常是不需要配置类的,除非考虑特定需要的消息接收配置

3、control包

此处不需要Controller层:RabbitMQ的消息接收是通过监听器来完成的,不需要对外暴露接口进行消息接收交互

4、service包

包含消息接收及业务处理的业务类

完成消息接收:增加@RabbitListener和@RabbitHandler完成幂等性校验:接收到的消息,要根据业务ID做去重判断,否则很可能出现重复消费完成业务处理:消息正常接收后的业务处理

5、其他包或配置

其他包的配置不涉及RabbitMQ使用,为节省篇幅此处暂且忽略

有疑问或想了解的小伙伴可留言或讨论 ^-^

关键点说明

1、容易忽略的点

感兴趣的小伙伴们可自行测试验证,有任何想法可交流 ^-^

spring-boot-starter-amqp:2.2.6.RELEASE 与之前 2.0.x/2.1.x 版本的区别

需要在Springboot启动类上增加标注@EnableRabbit

不增加则报异常: org.springframework.amqp.AmqpException: Ambiguous methods for payload type: class [B:

异常原因:@EnableRabbit标注会在服务启动时扫描@RabbitListener标注的类或方法,并对扫描到的类或方法对应生成的代理类和代理方法进行特殊处理。如果不增加,则代理类和方法无正进行常监听。

增加@RabbitListener标注

需要在接收类或方法上增加@RabbitListener,特别需要在属性中增加ackMode="MANUAL"

只有增加了@RabbitListener,RabbitMQ的API在Springboot启动时才会监控是否有消息需要接收。

只有增加了ackMode="MANUAL",接收消息时才会进行定制化的处理。即按照我们自己的设计在消息确认接收且成功处理后,再从队列中删除消息。

设置ackMode="MANUAL"的原因:

如果不增加ackMode属性,默认ackMode="AUTO“。即RabbitMQ容器会自动判断消息是否接收成功,依据就是根据在消息接收过程中是否发生异常。默认设置可能发生一些意想不到的情况,此处仅举例说明但不包含全部

1、暴露出来的异常信息很可能是RabbitMQ异常,自己的业务异常很可能被掩盖,不能很好的定位异常原因

2、如果消息异常处理机制设计的不完善,一旦发生异常,RabbitMQ容器会自动重试处理,很可能造成消息死循环处理

@RabbitHandler标注的消息接收方法参数中需要增加Channel和Message类型参数

设置ackMode="MANUAL"后,我们在处理消息接收时,需要对成功/失败两种情况进行手动处理。

1、消息处理成功:调用API告诉RabbitMQ消息已成功处理,消息可以从队列移除/删除,此时消息不再持久化了

2、消息处理失败:调用API告诉RabbitMQ消息处理失败了,至于消息是否需要从队列移除/删除,根据自己业务场景来决定

确认消息处理成功的API

basicAck(long deliveryTag, boolean multiple) throws IOException

参数deliveryTag:给接收到消息打个标记。默认应由RabbitMQ随机生成并用来它自己区分接收到的消息。所以此处应赋值为message.getMessageProperties().getDeliveryTag()

参数multiple:是否确认消息处理成功。true代表确认当前一批消息中的所有消息,false代表仅确认deliveryTag参数传递来的当前标记的消息

补充说明:RabbitMQ对于消息接收是通过NIO来处理的,所以接收端对于消息队列中的消息是异步分批接收的。也就是说如果有1w条消息,RabbitMQ接收端会分批接收消息。

确认消息处理失败的API

basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException

参数deliveryTag:同上

参数multiple:同上

参数requeue:对于消息处理失败的消息是否重新进入消息队列。true代表不重新进入队列,false代表重新进入队列。至于选择true/false,根据自己业务场景来决定

@RabbitHandler标注的消息接收方法参数中的接收消息类型需要与发送类型匹配

@RabbitHandler标注的消息接收方法参数中的第一个参数是指定接收消息的接收对象。这个参数需要与发送消息时设定的类型匹配

不匹配则报异常: org.springframework.amqp.AmqpException: No method found for class [B

特别说明:如果发送消息的类型是String,RabbitMQ会默认使用apache/octet-stream来传输字节流。此时消息的接收类型不能是String,而必须是byte[]

2、已经忽略的点

下述情况、规范或场景未周详考虑,根据需要可逐步完善 ^-^

代码规范:类注释、方法注释、命名规范等日志记录:记录的格式、时机、粒度等前端交互:前端的约定、格式等非消息异常:捕获、处理等补偿重发:消息发送失败后的补偿小结

今天主要介绍了如何构建RabbitMQ消息接收端

小伙伴们都了解了吗?

下次小Top将继续介绍RabbitMQ开发

对于今天的内容有任何疑问或问题,欢迎留言或讨论 ^-^

本文涉及的代码

@Slf4j

@Service

//Todo: 配置从哪个Queue接收消息以及设置消息接收确认类型为手动处理

@RabbitListener(queues = "user2", ackMode = "MANUAL")

public class ReceiveUserMQImpl {

@Autowired

public UserMessageReceiveMapper userMessageReceiveMapper;

/**

* Todo: 真正的消息接收处理方法

* @param content 消息接收类型,根据消息发送类型(不固定)

* @param channel 消息接收Channel

* @param message 消息封装对象

*/

@RabbitHandler

@Transactional

public void receiveMyUserMessage(byte[] content, Channel channel, Message message) {

try {

String msg = new String(content);

log.info("receive msg: " + msg);

log.info("delivery tag: " + message.getMessageProperties().getDeliveryTag());

User user = this.getUserByJson(msg);

boolean isProcess = isProcess(user);

//Todo: 如果消息未处理过则执行业务方法

if(!isProcess) {

this.saveUserMessage(user, msg, UserMessageStatus.SEND_RECEIVE_SUCCESS);

//Todo: 此处省略业务处理逻辑

... ...

}

//Todo: 调用API确认消息接收成功

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

} catch (IOException e) {

log.error("failed to receive user from RabbitMQ.");

try {

//Todo: 调用API确认消息未接收成功

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);

} catch (IOException ex) {

ex.printStackTrace();

}

}

}

//Todo: 对接收到的消息进行幂等性判断(即之前是否已经接收过此消息了)

private boolean isProcess(User user) {

Long userId = this.userMessageReceiveMapper.findByUserId(user.getId());

return userId == null ? false : true;

}

//Todo: Json字符串转换处理

private User getUserByJson(String jsonStr) {

User user = null;

try {

user = new ObjectMapper().readValue(jsonStr, User.class);

} catch (JsonProcessingException e) {

log.error("The message can't belong to User class!");

e.printStackTrace();

}

return user;

}

//Todo: 接收端消息入库保存

private void saveUserMessage(User user, String jsonStr, UserMessageStatus status) {

log.info("It's the first handler.");

UserMessage userMessage = UserMessage.builder()

.userid(user.getId())

.message(jsonStr)

.status(status.getSend_status())

.build();

this.userMessageReceiveMapper.insert(userMessage);

}

}

本文由星空奇点原创,欢迎关注,带你一起涨知识!

标签: #rabbitmq jar包