龙空技术网

Flink中,State TTL 功能具体要怎么用?

散文随风想 54

前言:

此刻看官们对“ttl卸载系统应用”大体比较关注,兄弟们都想要学习一些“ttl卸载系统应用”的相关知识。那么小编在网摘上网罗了一些对于“ttl卸载系统应用””的相关内容,希望大家能喜欢,我们一起来学习一下吧!

使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。

例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了 State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理。

State TTL 功能的用法

在 Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:

import org.apache.flink.api.common.state.StateTtlConfig;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.seconds(1))

.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

.build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);

stateDescriptor.enableTimeToLive(ttlConfig);

TTL配置有以下几个选项:newBuilder的第一个参数表明数据的有效,是必选项

TTL 的更新策略(默认是OnCreateAndWrite):

StateTtlConfig.UpdateType.OnCreateAndWrite - 只在创建和写入时更新StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新

数据在早期但尚未被清除时的可见性配置如下(默认为NeverReturnExpired):

StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清除的数据

NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。

ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

注意:

状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。暂时只支持基于 processing time 的 TTL。尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。State TTL 当前在 PyFlink DataStream API 中还不支持。

标签: #ttl卸载系统应用