龙空技术网

消息中间件应用的常见问题与方案

轻松克里斯蒂8P0J 52

前言:

眼前同学们对“kafka中的一个消费者可以属于几个消费者组”大概比较讲究,你们都想要分析一些“kafka中的一个消费者可以属于几个消费者组”的相关资讯。那么小编也在网络上汇集了一些对于“kafka中的一个消费者可以属于几个消费者组””的相关内容,希望我们能喜欢,大家快快来了解一下吧!

第 01 部分

介绍

消息队列(MQ)中间件已经流行很多年了。 在互联网应用中,我们通常可以在较大的应用中看到MQ。 目前市场上的消息中间件有很多,包括但不限于RabbitMQ、RocketMQ、ActiveMQ、Kafka(流处理中间件)等,很多开发者已经熟练使用一种或多种消息中间件。 不过,还是有一些朋友对消息中间件不是特别熟悉。 由于种种原因,他们无法深入学习和理解其原理和细节,从而在使用时可能会出现这样或那样的问题。 在这里,我们将分析消息队列中间件使用中的典型问题(包括顺序消息、可靠性保证、消息幂等性、延迟消息等)并提供一些解决方案。

第02部分

消息中间件应用背景

1.消息中间件基本思想

在单个系统中,一些业务流程可以顺序执行。 当涉及到跨系统(有时也包括系统内部)时,就会需要更复杂的数据交互(也可以理解为消息传递)。 这些数据的交互传输方式可以是同步的,也可以是异步的。 在异步数据传输的情况下,往往需要载体来临时存储和分发消息。 在此基础上专门为消息接收、存储、转发而设计开发的专业应用程序可以理解为消息队列中间件。

扩展一下:如果我们简单地用一个数据库表来记录数据,然后接受数据并存储到数据表中,然后通过计划任务分发数据表中的数据,那么我们就实现了最简单的消息系统(这是本地消息表)。

我们可以认为消息中间件的基本思想就是利用高效可靠的消息机制进行异步数据传输。 在这一基本思想的指导下,不同的信息因其所关注的场景目的不同而具有不同的功能、表现和整体设计理念。

消息队列(MQ)本身实现了一种从生产者到消费者的单向通信模型。 常用的RabbitMQ、RocketMQ、Kafka等MQ都是指实现该模型的消息中间件。 目前最常用的消息中间件主要有RabbitMQ、RocketMQ、Kafka(分布式流处理平台)、Pulsar(分布式消息流平台)。 这里我收录了两个流处理平台,其他一些早期的消息中间件已经慢慢淡出了人们的视线。 在选择服务时,我们遵循两个主要原则:最大熟悉度原则(易于运维、使用可靠)、业务契合度原则(中间件性能能够支撑业务量、满足业务功能需求)。

这些常用的消息中间件选型比较很容易找到,这里不再详细介绍。 粗略地说:Pulsar 目前的使用量不如 RabbitMQ、RocketMQ 和 Kafka。 RabbitMQ主要专注于高可靠消息,RocketMQ性能与功能并重,Kafka主要应用于大数据处理(Pulsar类似)。

2.引入消息中间件的意义

我们先通过一个例子简单介绍一下异步、解耦、削峰的意义和价值(参考下面的流程图):

对于一个用户注册接口,假设有两个业务点,分别是注册和给新用户发放福利,每个业务点需要50ms的处理逻辑。 如果我们将这两个业务流程耦合到一个接口上,总共需要100ms才能完成。 不过,在这个过程中,用户注册后,无需担心自己的权益是否立即分配。 只要注册后尽快返回数据成功,后续的新手福利业务就可以在主流程外办理。 如果剥离出来,界面主流程中只处理登录逻辑,并通过MQ推送消息,后续给新用户发放福利的逻辑则异步处理。 这样可以保证注册接口能够在50ms左右得到结果。

给新人发放福利的业务是通过异步任务慢慢处理的。 通过业务点的拆分,我们实现了解耦。 已注册的子业务增减功能点不会影响主流程。 另外,如果主业务流程在某个时刻请求并发量较高,异步的方式可以将压力分散到较长的时间段,以降低固定时间段的处理压力。 这就是流量调峰。

另外,单线程模型语言通常对消息中间件有更强的要求。 虽然多线程模型语言,或者说基于协程的语言,可以通过自身的多线程(或者协程)机制实现业务内部的异步处理,但考虑到持久化问题和管理难度,成熟的中间件更适合。 中间件适合异步数据通信,也可以实现分布式系统之间的异步数据通信。

3、消息中间件的应用场景

消息中间件的应用场景主要包括:

因此,如果您的企业有上面列出的场景,或者类似的功能和性能需求,那么赶紧引入“消息中间件”来提升您的业务绩效。

第03部分

消息中间件引入引发的一系列问题

消息中间件的引入虽然有以上的种种好处,但是在使用的时候仍然存在很多问题。 例如:

当然,我们可以针对业务开发者将上述问题细化,得到以下关键问题:

第04部分

问题方案

1.消息顺序保证

传统的消息中间件和流处理中间件一般都是为了支持顺序消息而设计的。 但根据中间件本身设计目标的不同,有不同的原理架构。 因此,我们在业务中使用中间件时,需要做出有针对性的决策。 不同的治疗方法。

以下是几种常用消息或流中间件的顺序消息设计以及使用过程中乱序问题的分析:

兔子MQ:

RabbitMQ的单队列(queue)本身可以保证消息的先进先出。 在设计上,RabbitMQ 提供的单个队列数据存储在单个代理节点上。 当镜像队列开启后,镜像队列仅作为消息的副本存在,服务仍由主队列提供。 在这种情况下,单个队列上的消费自然是顺序的。 但由于单个队列支持多个消费者同时消费,当我们让多个消费者消费统一队列上的数据时,消息就会分散到多个消费者中。 当并发较高时,多个消费者无法保证消息的处理。 顺序性。

解决方案是对于需要强制排序的消息使用同一个MQ队列,并且单个队列只启用一个消费者(为了保证并发处理的顺序,多线程也是如此)。 由此带来的单个队列吞吐量降低的问题,可以采用Kafka的设计理念来解决,为单个任务打开一组多个队列,按照固定的标识符(例如:ID)路由需要排序的消息,将它们分发到该组。 队列中,相同身份的消息进入同一个队列,单个队列使用单个消费者进行消费,从而保证了消息的顺序和吞吐量。

如图所示:

卡夫卡:

Kafka是一个流处理中间件。 在它的设计中,没有队列的概念。 消息的发送和接收取决于Topic。 单个主题可以有多个分区(partition)。 这些分区可以分散到多个broker节点上,也可以使用分区。 设置副本备份以确保高可用性。

对于同一主题,Kafka 可以有多个消费者甚至消费者组。 Kafka中的消息消费一般采用消费者组(消费者组可以消费同一主题下的消息,互不干扰)进行消费。 一个消费者组中可以有多个消费者。 当同一个consumer group消费单个topic下的多个partition时,kafka会调整consumer group内consumer和partition的消费进度和余额。 但有一点是可以保证的:单个分区只能被同一个消费者组中的一个消费者消费。

在上述设计理念下,Kafka内部保证同一分区内的消息是顺序的,但不保证Topic下的消息顺序。 当Kafka的消息生产者发送消息时,它可以选择将消息发送到哪个分区。 我们只需要把需要顺序处理的消息发送到topic下的同一个分区,就可以保证消息消费的顺序。 (多线程语言使用单消费者,多线程处理数据时需要自己保证处理的顺序,这里略过)。

火箭MQ:

您可以通过阿里云官网了解RocketMQ的一些基本概念和原理:消息队列RocketMQ是什么版本? - 消息队列RocketMQ版本-阿里云[1]。

RocketMQ的消息发送和接收也是基于Topic的。 Topic下有多个Queue,分布在一个或多个Broker上,保证消息的高性能发送和接收(有点类似于Kafka的Topic-Partition机制,但内部实现原理不同,相同)。

RocketMQ支持部分顺序消息消费,保证同一个消息队列上的消息顺序消费。 不支持全局顺序消息消费。 如果要实现某个Topic的全局顺序消息消费,可以将Topic的队列数设置为1,牺牲高可用。 具体图示请参考阿里云文档:顺序消息2.0-消息队列RocketMQ版本-阿里云[2]

2. 避免消息丢失

消息丢失需要分为三部分:消息生产者将消息发送到消息中间件时不发生消息丢失、消息从接收、存储到消息中间件被消费的过程中不发生消息丢失、无消息消息消费过程中发生丢失。 保证中间件发送的消息能够被消费而不会丢失。

生产者发送消息而不丢失消息:

消息中间件一般都有消息发送确认机制(ACK)。 对于客户端来说,只要消息发送需要ACK确认,就可以根据返回的结果来判断消息是否成功发送到中间件。 这一步通常与中间件的消息接收存储过程的设计有关。 根据中间件的设计,我们通常采取的措施有:

在具体的业务设计中,如果消息传递失败,我们可以根据业务的重要性进行相应的补偿,例如:

消息中间件消息不丢失:

数字消息中间件的消息接收和存储机制各有不同,但都会根据各自的特点进行设计,最大程度保证消息不会丢失:

RabbitMQ消息接收和存储:

RocketMQ消息接受并保存:

Kafka在接收和保存消息时的设计是:

保存积压消息的及时性问题:

Consumer消费消息不丢失:

消息ACK带来两个问题:

对于无限阻塞的问题,可以参考RocketMQ消费失败的重试机制,对消息重试进行一定的设计:

关于消息重复消费的问题请参考下一节。

3.消息重复问题(消费幂等性)

在分析常用的中间件时,我们经常发现中间件设计者将这个问题的处理委托给了中间件用户,即业务开发人员。 确实,业务消费者处理的逻辑比消息生产者复杂得多。 生产者只需要保证消息成功发送到中间件即可,而消费者则需要在消费脚本中处理各种复杂的业务逻辑。

解决消息重复消费的问题,核心是用唯一的标识符来标记某条消息是否已被处理。 有许多特定选项可用,例如:

4.消息积压处理

通常我们在引入消息中间件的时候,就已经对消息消费的产生率和消费率进行了评估和测试,并尽量做到一个平衡。 但业务中也存在一些不可预测的突发情况,可能会造成消息大量积压。 这时我们可以通过以下方式来处理:

临时紧急扩容

预防消息积压:

5. 延迟消息处理

延迟消息的功能在一些MQ中间件中实现。 延迟消息和预定消息实际上可以相互转换。

火箭MQ:

RocketMQ调度消息不支持任意时间精度(出于性能原因)。 仅支持特定级别的延迟消息。 消息延迟级别在broker端通过messageDelayLevel配置。 它内部为每个延迟级别创建对应的消息消费队列,然后创建该延迟级别对应的定时任务,从消息消费队列中拉取消息并恢复该消息的原始主题和原始消息消费队列。

兔子MQ:

RabbitMQ通常有两种方案来实现延迟消息:一是创建消息延迟死信队列,并搭配死信转发队列来实现消费延迟。 但该方法中,如果前一条消息没有达到TTL时间,即使后一条消息达到了,也不会被转发到转发队列中; 另一种是使用延迟Exchange插件(rabbitmq_delayed_message_exchange),直到达到TTL才将消息转发到转发队列。 放入相应的队列并消费。

Kafka本身不支持延迟消息或定时消息。 如果你想延迟消息,则需要使用其他解决方案。

借助数据库和计划任务实现延迟消息:

常用数据库的索引结构都支持数据的顺序索引。 借助数据库可以轻松实现随时延迟消费消息。 用一张表存储数据的消耗时间,启动一个定时任务,满足条件后提取消息,然后转发到顺序队列中处理或者直接处理(已处理需要标记,不会出现再次),但Direct处理需要考虑吞吐量和并发重复性等问题。 它不如将单个脚本转发到普通队列进行处理那么方便。 数据库支持的定时任务消息积压是可控的,但是吞吐量会受到限制。

借助 Reids 的有序列表实现延迟消息:

Reids的有序列表zset结构可以实现延迟消息。 以消息消费时间为分数,将消息添加到zset中。 使用zrangebyscore命令消费消息 #命令格式 zrangebysocre key min max withscores limit 0 1 消费最早的消息 #min max 分别代表起始分数和结束分数区间。 分别使用0和当前时间戳,可以查出消费达到时间消息#withscores表示查询的数据必须有分数。 limit 后面是查询 zrangebyscore key 0 {current timestamp} 和scores limit 0 1 的起始偏移量和数量。

当然,这个解决方案也有局限性。 首先,redis必须配置持久化,防止消息丢失(如果配置不合理,不能100%保证,但每个命令的持久化会导致性能下降,需要权衡); 其次,如果消息延迟太多,会造成消息积压,形成一个大key; 第三,需要自己平衡重复消费和消费失败(当然可以,但建议启动单次消费流程,将延迟消息转移到普通队列中消费)。

基于时间轮的任务调度:

在很多软件中,都有基于时间轮的计划任务的实现。 可以使用时间轮和多级时间轮来实现延迟任务调度。 如果我们想自己实现一个延迟任务队列,可以考虑使用该算法来实现任务调度,但是需要根据具体需求设计支持的任务的延迟上限以及调度的时间粒度(多级) 。 这里就不解释时间轮算法了。 如果您有兴趣,可以自行搜索、了解。

第05部分

总结

通过以上几节的介绍,相信您已经能够很自然地了解:消息队列和异步解耦的功能和核心思想,并对如何使用MQ来架构自己的业务有了一定的了解。 使用MQ的大部分问题只是需要我们多思考,仔细考虑细节,保证业务的高可用。 我们甚至可以从这些解决方案中提炼出一些核心,以便我们可以在业务中参考类似的思路,优化我们的业务。 例如,消息顺序保证的核心是顺序消息生产者发送到唯一的分区,然后维护单个消费者对固定分区的顺序消费; 避免消息丢失的核心是每一步的确认和降级机制; 消费幂等性的核心是唯一标识和步骤状态; 消息积压处理的核心是快速响应应急预案; 延迟消息的核心是消息排序,优化点是性能提升。

科学方法包括归纳法和演绎法。 在学习问题解决方案的过程中,应在使用中提取并推导相应的核心思想。 这些总结的知识点然后可以应用到业务中,从而更熟练地处理相应的问题。 交易和构建高可用的业务架构是我们最需要做的。

参考链接:

【1】

【2】

【3】

【5】丁伟,周继峰。 《RocketMQ技术内幕——RocketMQ架构设计与实现原理》。 机械工业出版社

【6】内哈·纳赫德、格温·夏皮拉、托德·帕里诺。 《卡夫卡权威指南》。 人民邮电出版社

今天,因为你们的喜欢,我元气满满!

标签: #kafka中的一个消费者可以属于几个消费者组