龙空技术网

「Flink实时数据分析系列」7.有状态算子和应用(下)

DatabriAI 151

前言:

眼前大家对“后端接收表单序列化的数据是什么”大体比较看重,大家都想要知道一些“后端接收表单序列化的数据是什么”的相关内容。那么小编同时在网摘上搜集了一些关于“后端接收表单序列化的数据是什么””的相关资讯,希望我们能喜欢,兄弟们一起来了解一下吧!

数据与智能 本公众号关注大数据与人工智能技术。由一批具备多年实战经验的技术极客参与运营管理,持续输出大数据、数据分析、推荐系统、机器学习、人工智能等方向的原创文章,每周至少输出10篇精品原创。同时,我们会关注和分享大数据与人工智能行业动态。欢迎关注。

来源 | 「Stream Processing with Apache Flink」

作者 | Fabian Hueske and Vasiliki Kalavri

翻译 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究

校对 | gongyouliu

编辑 | auroral-L

全文共8061字,预计阅读时间40分钟。

在「有状态算子和应用(上)」中,我们讲解了「实现有状态函数」 、「为有状态的应用开启故障恢复」这两部分,在本篇文章中我们将讲解余下部分。

目录

三、有状态应用的性能及鲁棒性

1.选择状态后端 2.选择状态原语 3.防止状态泄露四、更新有状态应用 1.保持现有状态更新应用 2.从应用中删除状态 3.修改算子的状态五、可查询式状态 1.可查询式状态服务的架构及启用方式 2.对外暴露可查询式状态 3.从外部系统查询状态❤总结

三、有状态应用的性能及鲁棒性

算子与状态交互的方式对应用程序的健壮性和性能有影响。有几个方面会影响应用程序的行为,比如选择在本地维护状态并执行检查点的状态后端存储、检查点算法的配置以及应用程序状态的大小。在本节中,我们将讨论确保长时间运行的应用程序的健壮执行行为和一致性能所需考虑的方面。

1.选择状态后端

在“State Backends(状态后端)”中,我们解释了Flink在状态后端维护应用程序状态。状态后端负责存储每个任务实例的本地状态,并在采取检查点时将其持久化到远程存储。因为本地状态可以通过不同的方式进行维护和检查,所以状态后端是可插拔的——两个应用程序可以使用不同的状态后端实现来维护它们的状态。状态后端的选择对有状态应用程序的健壮性和性能有影响。每个状态后端为不同的状态原语(如ValueState、ListState和MapState)提供实现。

目前,Flink提供三个状态后端,MemoryStateBackend, FsStateBackend和 RocksDBStateBackend:

MemoryStateBackend将状态存储为TaskManager JVM进程堆上的常规对象。例如,MapState是由Java HashMap对象支持的。虽然这种方法提供了非常低的读写状态延迟,但是它对应用程序的健壮性有影响。如果任务实例的状态变得太大,JVM和所有在其上运行的任务实例可能会由于OutOfMemoryError错误而被杀死。此外,这种方法可能会遇到垃圾收集暂停,因为它会将许多存活时间较长的对象放在堆上。当采取检查点时,MemoryStateBackend将状态发送给JobManager,后者将其存储在堆内存中。因此,应用程序的总体状态必须适合JobManager的内存。因为它的内存是易失的,所以在JobManager失败的情况下,状态会丢失。由于这些限制,仅建议将MemoryStateBackend用于开发和调试。FsStateBackend将本地状态存储在TaskManager的JVM堆上,就像MemoryStateBackend一样。但是,FsStateBackend将状态写入远程持久文件系统,而不是将状态检查点指向JobManager的易失性内存。因此,FsStateBackend为本地访问提供内存中的速度,并在出现故障时提供容错能力。但是,它受到TaskManager内存大小的限制,可能会出现垃圾收集暂停。RocksDBStateBackend将所有状态存储到本地RocksDB实例中。RocksDB是一个将数据持久化到本地磁盘的嵌入式键值存储。为了向RocksDB读写数据,需要对它进行序列化、反序列化。RocksDBStateBackend还将状态检查点存储到远程,持久化到文件系统。由于RocksDBStateBackend将数据写入磁盘并支持增量检查点(更多信息见“检查点、保存点和状态恢复”),所以对于状态非常大的应用程序来说,RocksDBStateBackend是一个不错的选择。用户场景已经使用RocksDBStateBackend的状态大小为多个TB的应用程序。但是,与在堆上维护状态相比,将数据读写到磁盘和反序列化对象的开销会导致更低的读写性能。

因为StateBackend是一个公共接口,所以也可以实现自定义状态后端。示例7-12展示了如何为应用程序及其所有有状态函数配置状态后端(这里是RocksDBStateBackend)。

val env = StreamExecutionEnvironment.getExecutionEnvironmentval checkpointPath: String = ???// configure path for checkpoints on the remote filesystemval backend = new RocksDBStateBackend(checkpointPath)// configure the state backendenv.setStateBackend(backend)

我们在“调优检查点和恢复”中讨论了如何在应用程序中使用和配置状态后端。

2.选择状态原语

有状态算子(内置的或用户定义的)的性能取决于几个方面,包括状态的数据类型、应用程序的状态后端和选择的状态原语。对于读写时反序列化状态对象的状态后端,例如RocksDBStateBackend,状态原语(ValueState、ListState或MapState)的选择可能会对应用程序的性能产生重大影响。例如,ValueState在访问时是完全反序列化的,在更新时是序列化的。RocksDBStateBackend的ListState实现在构造Iterable读取值之前反序列化所有列表条目。但是,向ListState添加单个值——将其附加到列表的末尾——是一种廉价的操作,因为只有附加的值是序列化的。RocksDBStateBackend的MapState允许对每个键读取和写入值——只有那些被读取或写入的键和值是反序列化的。在遍历MapState的条目集时,序列化的条目是预先从RocksDB获取的,只有在实际访问键或值时才反序列化。

例如,使用RocksDBStateBackend,使用 MapState[X, Y] 比使用 ValueState[HashMap[X, Y]] 更有效。如果元素经常被附加到列表中,并且列表中的元素很少被访问,那么 ListState[X] 相对于 ValueState[List[X]] 有一个优势。

另一个好的实践是每个函数调用只更新一次状态。由于检查点与函数调用是同步的,所以多个状态更新不会带来任何好处,但是当在单个函数调用中多次更新状态时,可能会导致额外的序列化开销。

3.防止状态泄露

流应用程序通常设计为连续运行数月或数年。如果应用程序的状态一直在增加,那么在某些情况下,它会变得太大并杀死应用程序,除非采取措施给应用程序扩展到更多的资源。为了防止随着时间的推移而增加应用程序的资源消耗,必须控制算子状态的大小。由于状态的处理直接影响算子的语义,所以Flink不能自动清除状态并释放存储。相反,所有有状态算子都必须控制其状态的大小,并确保其不会无限增长。

状态增长的一个常见原因是keyed state在一个不断更新的key域上。在此场景中,有状态函数接收带有键的记录,这些键仅在一段时间内是活动的,之后就再也不会接收。一个典型的例子是单击事件流,其中单击具有一段时间后过期的会话id属性。在这种情况下,带有keyed state的函数会为越来越多的键累积状态。随着键空间的发展,过期键的状态将变得陈旧和无用。此问题的解决方案是删除过期key的状态。但是,具有keyed state的函数只有在接收到具有该键的记录时才能访问键的状态。在许多情况下,函数不知道一条记录是否是键的最后一条记录。因此,它将不能为key退出状态,因为它可能会收到另一个相同key记录。

这个问题不仅存在于自定义有状态函数中,也存在于DataStream API的一些内置算子中。例如,在KeyedStream上计算运行的聚合,可以使用内置的聚合函数(如min、max、sum、minBy或maxBy),也可以使用自定义的ReduceFunction或AggregateFunction)来保持每个键的状态,并且从不丢弃它。因此,只有当键值来自常量和有界域时,才应该使用这些函数。其他例子是带有基于计数器的触发器的窗口,当接收到一定数量的记录时,这些触发器将处理并清除它们的状态。具有基于时间的触发器(处理时间和事件时间)的窗口不受此影响,因为它们根据时间触发和清除它们的状态。

这意味着在设计和实现有状态算子时,应该考虑应用程序需求及其输入数据的属性,例如键域。如果你的应用程序需要移动key域的keyed state,那么它应该确保在不再需要时清除键的状态。这可以通过注册将来某个时间点的定时器来实现。与state类似,定时器是在当前活动key的上下文中注册的。当定时器触发时,将调用回调方法并加载定时器key的上下文。因此,回调方法可以完全访问键的状态,并且可以清除它。提供注册定时器支持的函数是windows的触发器接口和处理函数。两者都在第六章中讨论过。

例7-13显示了一个KeyedProcessFunction,它比较两个后续的温度测量值,并在差异大于某个阈值时发出警报。这与之前的keyed state示例是相同的用例,但是KeyedProcessFunction也清除了键的状态(即sensors),在事件发生后一小时内没有提供任何新的温度测量。

class SelfCleaningTemperatureAlertFunction(val threshold:Double)extends KeyedProcessFunction[String, SensorReading,(String, Double, Double)] {   // the keyed state handle for the last temperature   private var lastTempState: ValueState[Double] = _   // the keyed state handle for the last registered timer   private var lastTimerState: ValueState[Long] = _   override def open(parameters: Configuration): Unit = {       // register state for last temperature       val lastTempDesc = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])       lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)       // register state for last timer       val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long])       lastTimerState =getRuntimeContext.getState(timestampDescriptor)  }   override def processElement(       reading: SensorReading,       ctx: KeyedProcessFunction      [String, SensorReading, (String, Double,Double)]#Context,       out: Collector[(String, Double, Double)]): Unit = {       // compute timestamp of new clean up timer as record timestamp + one hour       val newTimer = ctx.timestamp() + (3600 * 1000)       // get timestamp of current timer       val curTimer = lastTimerState.value()       // delete previous timer and register new timer       ctx.timerService().deleteEventTimeTimer(curTimer)       ctx.timerService().registerEventTimeTimer(newTimer)       // update timer timestamp state       lastTimerState.update(newTimer)       // fetch the last temperature from state       val lastTemp = lastTempState.value()       // check if we need to emit an alert       val tempDiff = (reading.temperature - lastTemp).abs       if (tempDiff > threshold) {           // temperature increased by more than the threshold           out.collect((reading.id, reading.temperature, tempDiff))      }       // update lastTemp state       this.lastTempState.update(reading.temperature)  }   override def onTimer(       timestamp: Long,       ctx: KeyedProcessFunction      [String, SensorReading, (String, Double,       Double)]#OnTimerContext,       out: Collector[(String, Double, Double)]): Unit = {       // clear all state for the key       lastTempState.clear()       lastTimerState.clear()  }}

上面的KeyedProcessFunction实现的状态清理机制如下所示。对于每个输入事件,调用processElement()方法。在比较温度测量值并更新最后的温度之前,该方法通过删除前一个计时器并注册一个新计时器来更新清理定时器。清理时间是通过向当前记录的时间戳添加一个小时来计算的。为了能够删除当前注册的定时器,它的时间戳存储在一个名为lastTimerState的附加ValueState[Long]中。之后,该方法比较温度,可能发出警报,并更新其状态。

因为我们的KeyedProcessFunction总是通过删除当前定时器并注册一个新定时器来更新注册的定时器,所以每个键只注册一个定时器。一旦定时器触发,就会调用onTimer()方法。该方法清除与键关联的所有状态、最后的温度和最后的定时器状态。

四、更新有状态应用

我们经常需要修复一个bug,或者更新一个长时间运行的有状态流应用程序的业务逻辑。因此,需要用更新的版本替换正在运行的应用程序,并且不能丢失应用程序的状态。

Flink通过获取正在运行的应用程序的保存点、停止保存点并从保存点启动应用程序的新版本来支持此类更新。然而,在更新应用程序的同时,保持其状态仅在某些应用程序更改时才有可能做到——原始应用程序及其新版本需要与保存点兼容。我们将解释如何在保持保存点兼容性的同时迭代应用程序。

在“保存点”章节中,我们解释了保存点中的每个状态都可以由一个复合标识符来处理,该复合标识符由一个唯一的算子标识符和由状态描述符声明的状态名组成。

在实现应用程序时要考虑到版本迭代重要的是要理解应用程序的厨师设计决定了以后是否以及如何以与保存点兼容的方式修改它。如果最初的版本没有在设计时考虑到更新,那么许多更改将是不可能的。对于大多数应用程序更改,必须将唯一标识符分配给算子。

当从保存点启动应用程序时,通过使用算子标识符和状态名从保存点查找相应的状态,初始化已启动应用程序的算子。从保存点兼容性的角度来看,这意味着一个应用程序可以通过以下三种方式迭代演进:

在不更改或删除现有状态的情况下更新或扩展应用程序的逻辑。这包括向应用程序添加有状态或无状态算子。从应用程序中删除状态。通过更改状态原语或状态的数据类型来修改现有算子的状态。

在下面的小节中,我们将讨论这三种情况。

1.保持现有状态更新应用

如果更新应用程序而不删除或更改现有状态,则应用程序始终与保存点兼容,并且可以从较早版本的保存点启动。

如果你将一个新的有状态算子添加到应用程序中,或者将一个新的状态添加到一个现有的算子中,当应用程序从一个保存点启动时,该状态将初始化为空。

注意:更改内置有状态算子(如窗口聚合、基于时间的join或异步函数)的输入数据类型,通常会修改其内部状态的类型。因此,这样的更改并不是安全点兼容的,即使它们看起来并不明显。

2.从应用中删除状态

与向应用程序添加新状态不同,你可能还希望通过删除状态来调整应用程序——可以通过删除完整的有状态算子,也可以仅从函数中删除状态。当从上一版本的保存点启动应用程序的新版本时,保存点包含不能映射到重新启动的应用程序的状态。如果算子的唯一标识符或状态名被更改,也会出现这种情况。

默认情况下,Flink不会启动不还原保存点中包含的所有状态的应用程序,以避免丢失保存点中的状态。但是,可以禁用这个安全检查,如“运行和管理流应用程序”章节中所述。因此,通过从现有算子中删除有状态算子或状态来更新应用程序并不困难。

3.修改算子的状态

虽然从应用程序中添加或删除状态相当容易,而且不影响保存点兼容性,但是修改现有算子的状态则更加复杂。有两种方法可以修改状态:

通过更改状态的数据类型,例如将ValueState[Int]更改为ValueState[Double]通过更改状态原语的类型,例如将ValueState[List[String]]更改为ListState[String]

在一些特定的情况下,可以更改状态的数据类型。但是,Flink目前不支持更改状态的原语(或结构)。通过提供转换保存点的离线工具,有一些方法法可以支持这种情况。然而,截至Flink 1.7,还没有这样的工具存在。下面我们将重点讨论如何更改状态的数据类型。

为了理解修改状态数据类型的问题,我们必须理解在保存点中状态数据是如何表示的。保存点主要由序列化状态数据组成。将状态JVM对象转换为字节的序列化器,由Flink的类型系统生成和配置。这种转换基于状态的数据类型。例如,如果你有一个ValueState[String],那么Flink的类型系统将生成一个StringSerializer来将String对象转换为字节。序列化器还用于将原始字节转换回JVM对象。根据状态后端是存储序列化的数据(如RocksDBStateBackend)还是作为堆上的对象(如FSStateBackend),这将在函数读取状态或从保存点重新启动应用程序时发生。

由于Flink的类型系统根据状态的数据类型生成序列化器,所以当状态的数据类型改变时,序列化器可能会改变。例如,如果你将ValueState[String]更改为ValueState[Double],那么Flink将创建一个DoubleSerializer来访问状态。使用DoubleSerializer反序列化用StringSerializer序列化字符串生成的二进制数据会失败,这并不奇怪。因此,仅在非常特定的情况下支持更改状态的数据类型。

在Flink 1.7中,如果数据类型定义为Apache Avro类型,并且新数据类型也是根据Avro模式演化规则从原始类型演化而来的Avro类型,则支持更改状态的数据类型。Flink的类型系统将自动生成能够读取数据类型以前版本的序列化器。

状态演化和状态迁移是Flink社区的一个重要课题,受到了广泛的关注。你可以期望在Apache Flink的未来版本中改进对这些场景的支持。尽管做了这么多工作,我们还是建议在将应用程序投入生产之前,一定要仔细检查它是否能够按照计划进行改进。

五、可查询式状态

许多流处理应用程序需要与其他应用程序共享结果。一种常见的模式是将结果写入数据库或键值存储中,并让其他应用程序从该数据存储中检索结果。这样的体系结构意味着需要建立和维护一个单独的系统,这可能是一项重大的工作,特别是如果它还需要是一个分布式系统。

Apache Flink提供了可查询的状态来处理通常需要外部数据存储来共享数据的用例。在Flink中,任何keyed state都可以作为可查询状态公开给外部应用程序,并充当只读键值存储。有状态的流应用程序像往常一样处理事件,并以可查询的状态存储和更新其中间结果或最终结果。外部应用程序可以在流应用程序runtime请求指定key的状态。

注意,只支持key的点查询。不可能请求key范围查询或甚至运行更复杂的查询。

可查询状态并不适用于所有需要外部数据存储的场景。例如,只能在应用程序runtime访问可查询状态。当应用程序由于错误而重新启动、重新调整应用程序或将其迁移到另一个集群时,它是不可访问的。但是,它使许多应用程序更容易实现,例如实时仪表板或其他监视应用程序。

接下来,我们将讨论Flink的可查询状态服务的体系结构,并解释流应用程序如何公开可查询状态,外部应用程序可以查询它。

1.可查询式状态服务的架构及启用方式

Flink的可查询状态服务包含三个进程:

QueryableStateClient:外部应用程序使用QueryableStateClient来提交查询和检索结果。QueryableStateClientProxy:接受并服务客户端请求。每个TaskManager运行一个客户端代理。由于keyed state分布在算子的所有并行实例中,因此代理需要标识为所请求的key维护状态的TaskManager。此信息是从管理key组分配的JobManager请求的,并在接收到此信息后进行缓存。客户端代理从各自TaskManager的状态服务器检索状态,并将结果提供给客户端。QueryableStateServer:服务于客户端代理的请求。每个TaskManager运行一个状态服务器,该服务从本地状态后端获取查询key的状态,并将其返回给发出请求的客户端代理。

图7-1显示了可查询状态服务的体系结构。

为了在Flink设置中启用可查询状态服务——在TaskManager中启动客户机代理和服务器线程——你需要将flink-queryable-state-runtime JAR文件添加到TaskManager进程的类路径中。这是通过将它从安装的./opt文件夹复制到./lib文件夹来实现的。当JAR文件位于类路径中时,可查询状态线程将自动启动,并可以为可查询状态客户端的请求提供服务。正确配置后,你将在TaskManager日志中发现以下日志消息:

Started the Queryable State Proxy Server @ …

客户端代理和服务器使用的端口以及其他参数可以在./conf/flink-conf.yaml文件中配置。

2.对外暴露可查询式状态

实现具有可查询状态的流应用程序很容易。你所要做的就是定义一个具有keyed state的函数,并通过在获取状态句柄之前在StateDescriptor上调用setQueryable(String)方法使状态可查询。示例7-14展示了如何使lastTempState可查询,以说明keyed state的用法。

override def open(parameters: Configuration): Unit = {// create state descriptorval lastTempDescriptor =new ValueStateDescriptor[Double]("lastTemp", classOf[Double])// enable queryable state and set its external identifierlastTempDescriptor.setQueryable("lastTemperature")// obtain the state handlelastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)}

通过setQueryable()方法传递的外部标识符可以自由选择,并且仅用于配置可查询状态客户端。

除了对任何类型的keyed state启用查询的通用方法之外,Flink还提供了定义流接收器(sink)的快捷方式,这些接收器以可查询状态的方式存储流事件。示例7-15展示了如何使用可查询状态接收器。

val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData// project to sensor id and temperature.map(r => (r.id, r.temperature))// compute every 10 seconds the max temperature per sensor.keyBy(_._1).timeWindow(Time.seconds(10)).max(1)// store max temperature of the last 10 secs for each sensor// in a queryable statetenSecsMaxTemps// key by sensor id.keyBy(_._1).asQueryableState("maxTemperature")

asQueryableState()方法的作用是:向流附加一个可查询的状态接收器。可查询状态的类型是ValueState,它保存输入流的类型值,本例中为(String,Double)。对于每个接收到的记录,可查询状态接收器将该记录保存到ValueState中,以便始终存储每个key的最新事件。

具有可查询状态的函数的应用程序与任何其他应用程序一样执行。你只需确保taskmanager被配置为启动可查询的状态服务(如前一节所述)。

3.从外部系统查询状态

任何基于jvm的应用程序都可以使用QueryableStateClient来查询正在运行的Flink应用程序的可查询状态。这个类是由flink-queryable-state-client-java dependency提供的,你可以将其添加到你的项目中,如下所示:

<dependency>   <groupid>org.apache.flink</groupid>   <artifactid>flink-queryable-state-clientjava_2.12</artifactid>   <version>1.7.1</version></dependency>

QueryableStateClient是用任一TaskManager的主机名和可查询状态客户端代理监听的端口初始化的。默认情况下,客户端代理监听端口9067,但是可以在./conf/flink-conf.yaml文件中配置:

val client: QueryableStateClient =new QueryableStateClient(tmHostname, proxyPort)

获得状态客户端后,可以通过调用getKvState()方法来查询应用程序的状态。该方法接受几个参数,比如正在运行的应用程序的JobID、状态标识符、需获取状态的key、key的类型信息和查询状态的状态描述符。JobID可以通过REST API、Web UI或日志文件获得。getKvState()方法返回一个CompletableFuture[S],其中S是状态的类型(例如,ValueState[] 或 MapState[,_ ])。因此,客户端可以发送多个异步查询并等待它们的结果。示例7-16显示了一个简单的控制台仪表板,它查询上一节中显示的应用程序的可查询状态。

object TemperatureDashboard {// assume local setup and TM runs on same machine as clientval proxyHost = "127.0.0.1"val proxyPort = 9069// jobId of running QueryableStateJob// can be looked up in logs of running job or the web UIval jobId = "d2447b1a5e0d952c372064c886d2220a"// how many sensors to queryval numSensors = 5// how often to query the stateval refreshInterval = 10000def main(args: Array[String]): Unit = {// configure client with host and port of queryable state proxyval client = new QueryableStateClient(proxyHost, proxyPort)val futures = new Array[CompletableFuture[ValueState[(String, Double)]]](numSensors)val results = new Array[Double](numSensors)// print header line of dashboard tableval header =(for (i <- 0 until numSensors) yield "sensor_" + (i + 1)).mkString("\t| ")println(header) // loop foreverwhile (true) {// send out async queriesfor (i <- 0 until numSensors) {futures(i) = queryState("sensor_" + (i + 1), client)}// wait for resultsfor (i <- 0 until numSensors) {results(i) = futures(i).get().value()._2}// print resultval line = results.map(t => f"$t%1.3f").mkString("\t| ")println(line)// wait to send out next queriesThread.sleep(refreshInterval)}client.shutdownAndWait()}def queryState(key: String,client: QueryableStateClient): CompletableFuture[ValueState[(String, Double)]] = {client.getKvState[String, ValueState[(String, Double)], (String, Double)](JobID.fromHexString(jobId),"maxTemperature",key,Types.STRING,new ValueStateDescriptor[(String, Double)]("", // state name not relevant hereTypes.TUPLE[(String, Double)]))}}

为了运行示例,你必须首先使用可查询状态启动流应用程序。一旦运行,在日志文件或web UI中查找JobID,在仪表板的代码中设置JobID并运行它。然后仪表板将开始查询正在运行的流应用程序的状态。

❤总结

几乎每个重要的流应用程序都是有状态的。DataStream API提供了强大但易于使用的工具来访问和维护算子状态。它提供了不同类型的状态原语,并支持可插入状态后端存储。虽然开发人员可以灵活地与状态交互,但是Flink的runtime可以管理tb级的状态,并确保在出现故障时使用一次语义。第6章中讨论的基于时间的计算和可伸缩状态管理的组合使开发人员能够实现复杂的流应用程序。可查询状态是一种易于使用的特性,可以节省设置和维护数据库或键值存储的工作,从而将流应用程序的结果公开给外部应用程序。

标签: #后端接收表单序列化的数据是什么