龙空技术网

在Apache Kafka中记录压缩主题

闻数起舞 1429

前言:

眼前大家对“apache图片压缩”大约比较关切,我们都想要学习一些“apache图片压缩”的相关文章。那么小编同时在网上搜集了一些关于“apache图片压缩””的相关文章,希望小伙伴们能喜欢,兄弟们一起来学习一下吧!

当我开始阅读Kafka文档时,尽管压缩日志主题似乎是一个简单的概念,但我不清楚Kafka如何在内部将它们的状态保存在文件系统中。 本月我有时间阅读有关此功能的更多信息,并希望与您分享我的理解。

TL; DR

在本文中,我将介绍Kafka中的日志压缩主题。 然后,我将向您展示Kafka如何在内部将这些主题的状态保存在文件系统中。

先决条件

我假设您已经熟悉Apache Kafka的基本概念,例如代理,主题,分区,使用者和生产者。 另外,如果要运行示例命令,则必须运行Kafka代理和Zookeeper服务器。

什么是日志压缩主题

Kafka文档说:

日志压缩是一种提供更细粒度的每个记录保留的机制,而不是提供基于时间的粗粒度保留的机制。 这个想法是有选择地删除记录,其中我们使用相同的主键进行了更新。 这样,可以确保日志至少具有每个键的最后状态。

为了简化此描述,当在分区日志中有相同密钥的Kafka的较新版本时,Kafka会删除所有旧记录。 作为示例,请考虑对日志压缩主题"最新产品价格"的以下分区:

如您首先看到的,有两个键为p3的记录。 但是由于这是一个日志压缩的主题,Kafka会在后台线程中删除较旧的记录(在下一节中将对此进行更多介绍)。 现在假设我们有一个生产者,它将新记录发送到该分区。 生产者分别用键p6,p5,p5产生3条记录:

同样,Kafka Broker中的后台线程使用键p5和p6删除了较旧的记录。 请注意,压缩日志由两部分组成:尾部和头部。 Kafka确保尾部内部的所有记录具有唯一键,因为尾部在清洁过程的前一个周期中进行了扫描。 但是头部部分可以具有重复的值。

现在,我们了解了什么是日志压缩主题,是时候使用kafka-topics工具创建它们了。

创建日志压缩主题

创建一个紧凑的主题(我将详细描述所有配置):

kafka-topics --create --zookeeper zookeeper:2181 --topic latest-product-price --replication-factor 1 --partitions 1 --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"

产生一些记录:

kafka-console-producer --broker-list localhost:9092 --topic latest-product-price --property parse.key=true --property key.separator=:>p3:10$>p5:7$>p3:11$>p6:25$>p6:12$>p5:14$>p5:17$

注意,在上面的命令中,我用:分隔了键和值。 现在使用主题:

kafka-console-consumer --bootstrap-server localhost:9092 --topic latest-product-price --property print.key=true --property key.separator=: --from-beginningp3:11$p6:12$p5:14$p5:17$

如您所见,具有重复键的记录将被删除。 p5:14 $记录未删除,我们在描述清洁过程时会看到原因。 但是我们必须首先看一下Kafka在内部如何存储消息。

区隔

分区日志是一种抽象,它使我们能够轻松使用分区内的有序消息,而不必担心Kafka的内部存储。 但是,实际上,Kafka代理将分区日志分为多个段。 段是存储在文件系统中(数据目录内部和分区目录中)的文件,其名称以.log结尾。 在下图中,分区日志分为3个部分:

分区中的最后一个段称为活动段。 仅日志的活动段可以接收新产生的消息。 我们将在压缩日志的清理过程中看到Kafka与活动段的行为。

回到我们的示例,我们可以通过以下命令查看主题分区的段文件(假设您的Kafka数据目录为/ var / lib / kafka / data):

ls /var/lib/kafka/data/latest-product-price-0/00000000000000000000.index 00000000000000000006.log00000000000000000000.log 00000000000000000006.snapshot00000000000000000000.timeindex 00000000000000000006.timeindex00000000000000000005.snapshot leader-epoch-checkpoint00000000000000000006.index

00000000000000000000.log和00000000000000000006.log是此分区的段,而00000000000000000006.log是活动段。

Kafka何时创建新细分? 一种选择是通过在主题创建期间设置segment.bytes(默认为1GB)配置。 当细分的大小大于该值时,Kafka将创建一个新的细分。 另一个选择是通过设置segment.ms,如前所述。 使用此选项,当Kafka收到生产请求时,它将检查活动段是否早于segment.ms值。 如果年龄较大,则将创建一个新的细分。 在我们的命令中,我们设置segment.ms = 100来确保每100毫秒创建一个新的段。

有趣的是,当您设置segment.ms = 100时,您的细分可能会更小。 清理过程结束后(请参阅下一节),Kafka代理将合并非活动的细分并从中创建一个很大的细分。

有关Kafka的细分和内部存储的更多信息,请阅读Kafka的Storage内部结构工作原理和Kafka Storage内部结构实用介绍文章。

清洗过程

在启动期间,Kafka代理会创建许多清理器线程,用于清理压缩日志(这些线程的数量可通过log.cleaner.threads config进行配置)。 清洁线程将不断尝试在代理中找到最肮脏的日志,然后尝试对其进行清理。 对于每个日志,它计算脏率如下:

dirty ratio = the number of bytes in the head / total number of bytes in the log(tail + head)

然后,清洁器线程选择脏率最高的日志。 该日志称为最脏的日志,如果其值大于min.cleanable.dirty.ratio配置,则将其清除。 否则,清理器线程将被阻塞数毫秒(可通过log.cleaner.backoff.ms配置)。

找到最脏的日志后,我们要查找日志中可清除的部分。 请注意,日志的某些部分不可清除,将不会被扫描:

· 活动段内的所有记录。 这就是为什么我们仍然在消费者中看到重复的p5:14 $记录的原因。

· 如果将min.compaction.lag.ms配置设置为大于0,则不会清除任何时间戳记早于该配置的段。 这些段将不会被扫描以进行压缩。

现在我们知道我们要压缩哪些记录。 从日志中的第一条记录到不可清除的第一条记录。 为了简单起见,我们假定头中的所有记录都是可清理的。

请注意,我们知道日志尾部的每个记录都有唯一的键,因为在上次清除中删除了重复项。 只有在头部有一些记录,这些记录的键在日志中不是唯一的。 为了更快地找到重复的记录,Kafka在头部创建了一个记录地图。 回到我们的示例,偏移图结构是这样的:

如您所见,Kafka创建了一个名为offset map的结构,该结构的头部每个键都具有其对应的偏移量。 如果头部重复,则Kafka使用最新的偏移量。 在上图中,键为p6的记录的偏移量为5,而最新的偏移量为p5的偏移量是7。 地图中的条目(我们不想删除最新记录)。

在压缩日志的清理过程中,不仅将删除重复的消息,而且Kafka还将删除值为null的记录。 这些记录称为墓碑。 您可以通过设置delete.retention.ms config延迟删除它们。 通过设置此配置,Kafka会检查包含该记录的段的修改时间戳记,如果修改时间小于配置值,则将保留该记录。

现在,日志变得干净了。 清洁之后,我们有了新的尾巴和新的头部! 扫描进行清洁的最后一个偏移量(在我们的示例中为旧头的最后一条记录)是新尾巴的最后偏移量。

Kafka将新磁头的起始偏移量保留在数据目录根目录中名为cleaner-offset-checkpoint的文件中。 该文件用于日志的下一个清理周期。 我们可以查看主题检查点文件:

cat /var/lib/kafka/data/cleaner-offset-checkpoint01latest-product-price 0 6

如您所见,共有三行。 第一行是文件的版本(我认为是向后兼容),第二行具有值1,该值显示该行之后将有几行(仅一行),最后一行包含压缩日志主题的名称, 分区号以及该分区的头偏移量。

结论

在本文中,我向您展示了什么是日志压缩主题,如何存储它们以及Kafka如何定期清理它们。 最后,我想指出的是,日志压缩非常适合缓存场景,在这些场景中,您只想几乎实时地保留每个记录的最新值。 假设您想在应用程序启动时构建缓存。 您只需阅读压缩的主题并构建缓存,由于Kafka会顺序读取消息,因此它比使用SQL数据库预热缓存要快得多。

您可以在Martin Kleppmann的"将数据库由内到外"一文中阅读有关此技术的更多信息。 您可能还会发现我的上一篇文章"使用Kafka和Debezium的ASP.NET Core中的Beat Cache失效"很有用,这是该技术的一种实现。

参考文献

-storage-internals-d5b544f6925fhttps://kafka.apache.org/documentation/

(本文翻译自Seyed Morteza Mousavi的文章《Log Compacted Topics in Apache Kafka》,参考:)

标签: #apache图片压缩