龙空技术网

降本增效利器!趣头条Spark RSS最佳实践

云布道师 111

前言:

此刻我们对“关键字过滤 rss”大概比较注重,看官们都想要知道一些“关键字过滤 rss”的相关内容。那么小编也在网络上汇集了一些对于“关键字过滤 rss””的相关知识,希望我们能喜欢,大家快快来了解一下吧!

导读:阿里云 EMR 团队和趣头条的大数据团队共同研发了 RSS,解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 基础组件。

作者 | 王振华、曹佳清、范振

业务场景与现状

趣头条是一家依赖大数据的科技公司,在 2018~2019 年经历了业务的高速发展,主 App 和其他创新 App 的日活增加了 10 倍以上,相应的大数据系统也从最初的 100 台机器增加到了 1000 台以上规模。多个业务线依赖于大数据平台展开业务,大数据系统的高效和稳定成了公司业务发展的基石,在大数据的架构上我们使用了业界成熟的方案,存储构建在 HDFS 上、计算资源调度依赖 Yarn、表元数据使用 Hive 管理、用 Spark 进行计算,具体如图 1 所示:

图 1 趣头条离线大数据平台架构图

其中 Yarn 集群使用了单一大集群的方案,HDFS 使用了联邦的方案,同时基于成本因素,HDFS 和 Yarn 服务在 ECS 上进行了 DataNode 和 NodeManager 的混部。

在趣头条每天有 6W+ 的 Spark 任务跑在 Yarn 集群上,每天新增的 Spark 任务稳定在 100 左右,公司的迅速发展要求需求快速实现,积累了很多治理欠债,种种问题表现出来集群稳定性需要提升,其中 Shuffle 的稳定性越来越成为集群的桎梏,亟需解决。

当前大数据平台的挑战与思考

近半年大数据平台主要的业务指标是降本增效,一方面业务方希望离线平台每天能够承载更多的作业,另一方面我们自身有降本的需求,如何在降本的前提下支撑更多地业务量对于每个技术人都是非常大地挑战。熟悉 Spark 的同学应该非常清楚,在大规模集群场景下,Spark Shuffle 在实现上有比较大的缺陷,体现在以下的几个方面:

Spark Shuffle Fetch 过程存在大量的网络小包,现有的 External Shuffle Service 设计并没有非常细致的处理这些 RPC 请求,大规模场景下会有很多connection reset 发生,导致 FetchFailed,从而导致 stage 重算。Spark Shuffle Fetch 过程存在大量的随机读,大规模高负载集群条件下,磁盘 IO 负载高、CPU 满载时常发生,极容易发生 FetchFailed,从而导致 stage 重算。重算过程会放大集群的繁忙程度,抢占机器资源,导致恶性循环严重,SLA 完不成,需要运维人员手动将作业跑在空闲的Label集群。计算和 Shuffle 过程架构不能拆开,不能把 Shuffle 限定在指定的集群内,不能利用部分 SSD 机器。M*N 次的 shuffle 过程:对于 10K mapper、5K reducer 级别的作业,基本跑不完。NodeManager 和 Spark Shuffle Service 是同一进程,Shuffle 过程太重,经常导致 NodeManager 重启,从而影响 Yarn 调度稳定性。

以上的这些问题对于 Spark 研发同学是非常痛苦的,好多作业每天运行时长方差会非常大,而且总有一些无法完成的作业,要么业务进行拆分,要么跑到独有的 Yarn 集群中。除了现有面临的挑战之外,我们也在积极构建下一代基础架构设施,随着云原生 Kubernetes 概念越来越火,Spark 社区也提供了 Spark on Kubernetes 版本,相比较于 Yarn 来说,Kubernetes 能够更好的利用云原生的弹性,提供更加丰富的运维、部署、隔离等特性。但是 Spark on Kubernetes 目前还存在很多问题没有解决,包括容器内的 Shuffle 方式、动态资源调度、调度性能有限等等。我们针对 Kubernetes 在趣头条的落地,主要有以下几个方面的需求:

实时集群、OLAP 集群和 Spark 集群之前都是相互独立的,怎样能够将这些资源形成统一大数据资源池。通过 Kubernetes 的天生隔离特性,更好的实现离线业务与实时业务混部,达到降本增效目的。公司的在线业务都运行在 Kubernetes 集群中,如何利用在线业务和大数据业务的不同特点进行错峰调度,达成 ECS 的总资源量最少。希望能够基于 Kubernetes 来包容在线服务、大数据、AI 等基础架构,做到运维体系统一化。

因为趣头条的大数据业务目前全都部署在阿里云上,阿里云 EMR 团队和趣头条的大数据团队进行了深入技术共创,共同研发了 Remote Shuffle Service(以下简称 RSS),旨在解决 Spark on Yarn 层面提到的所有问题,并为 Spark 跑在 Kubernetes 上提供 Shuffle 基础组件。

Remote Shuffle Service 设计与实现Remote Shuffle Service 的背景

早在 2019 年初我们就关注到了社区已经有相应的讨论,如 SPARK-25299。该 Issue 主要希望解决的问题是在云原生环境下,Spark 需要将 Shuffle 数据写出到远程的服务中。但是我们经过调研后发现 Spark 3.0(之前的 master 分支)只支持了部分的接口,而没有对应的实现。该接口主要希望在现有的 Shuffle 代码框架下,将数据写到远程服务中。如果基于这种方式实现,比如直接将 Shuffle 以流的方式写入到 HDFS 或者 Alluxio 等高速内存系统,会有相当大的性能开销,趣头条也做了一些相应的工作,并进行了部分的 Poc,性能与原版 Spark Shuffle 实现相差特别多,最差性能可下降 3 倍以上。同时我们也调研了一部分其他公司的实现方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 开源的 Magnet,这些实现方案是首先将 Shuffle 文件写到本地,然后在进行 Merge 或者 Upload 到远程的服务上,这和后续我们的Kubernetes架构是不兼容的,因为 Kubernetes 场景下,本地磁盘 Hostpath 或者 LocalPV 并不是一个必选项,而且也会存在隔离和权限的问题。

基于上述背景,我们与阿里云 EMR 团队共同开发了 Remote Shuffle Service。RSS 可以提供以下的能力,完美的解决了 Spark Shuffle 面临的技术挑战,为我们集群的稳定性和容器化的落地提供了强有力的保证,主要体现在以下几个方面:

高性能服务器的设计思路,不同于 Spark 原有 Shuffle Service,RPC 更轻量、通用和稳定。两副本机制,能够保证的 Shuffle fetch 极小概率(低于 0.01%)失败。合并 shuffle 文件,从 M*N 次 shuffle 变成 N 次 shuffle,顺序读 HDD 磁盘会显著提升 shuffle heavy 作业性能。减少 Executor 计算时内存压力,避免 map 过程中 Shuffle Spill。计算与存储分离架构,可以将 Shuffle Service 部署到特殊硬件环境中,例如 SSD 机器,可以保证 SLA 极高的作业。完美解决 Spark on Kubernetes 方案中对于本地磁盘的依赖。

Remote Shuffle Service 的实现整体设计

Spark RSS 架构包含三个角色:Master、Worker、Client。Master 和 Worker 构成服务端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 实现了 ShuffleManager 接口)。

Master 的主要职责是资源分配与状态管理。Worker 的主要职责是处理和存储 Shuffle 数据。Client 的主要职责是缓存和推送 Shuffle 数据。

整体流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的组件),如图 2。

图 2 RSS 整体架构图

实现流程

下面重点来讲一下实现的流程:

RSS 采用 Push Style 的 shuffle 模式,每个 Mapper 持有一个按 Partition 分界的缓存区,Shuffle 数据首先写入缓存区,每当某个 Partition 的缓存满了即触发 PushData。Driver 先和 Master 发生 StageStart 的请求,Master 接受到该 RPC 后,会分配对应的 Worker Partition 并返回给 Driver,Shuffle Client 得到这些元信息后,进行后续的推送数据。Client 开始向主副本推送数据。主副本 Worker 收到请求后,把数据缓存到本地内存,同时把该请求以 Pipeline 的方式转发给从副本,从而实现了 2 副本机制。为了不阻塞 PushData 的请求,Worker 收到 PushData 请求后会以纯异步的方式交由专有的线程池异步处理。根据该 Data 所属的 Partition 拷贝到事先分配的 buffer 里,若 buffer 满了则触发 flush。RSS 支持多种存储后端,包括 DFS 和 Local。若后端是 DFS,则主从副本只有一方会 flush,依靠 DFS 的双副本保证容错;若后端是 Local,则主从双方都会 flush。在所有的 Mapper 都结束后,Driver 会触发 StageEnd 请求。Master 接收到该 RPC 后,会向所有 Worker 发送 CommitFiles 请求,Worker 收到后把属于该 Stage buffer 里的数据 flush 到存储层,close 文件,并释放 buffer。Master 收到所有响应后,记录每个 partition 对应的文件列表。若 CommitFiles 请求失败,则 Master 标记此 Stage 为 DataLost。在 Reduce 阶段,reduce task 首先向 Master 请求该 Partition 对应的文件列表,若返回码是 DataLost,则触发 Stage 重算或直接 abort 作业。若返回正常,则直接读取文件数据。

总体来讲,RSS 的设计要点总结为 3 个层面:

采用 PushStyle 的方式做 shuffle,避免了本地存储,从而适应了计算存储分离架构。按照 reduce 做聚合,避免了小文件随机读写和小数据量网络请求。做了 2 副本,提高了系统稳定性。

容错

对于 RSS 系统,容错性是至关重要的,我们分为以下几个维度来实现:

PushData 失败

当 PushData 失败次数(Worker 挂了,网络繁忙,CPU繁忙等)超过 MaxRetry 后,Client 会给 Master 发消息请求新的 Partition Location,此后本 Client 都会使用新的 Location 地址,该阶段称为 Revive。

若 Revive 是因为 Client 端而非 Worker 的问题导致,则会产生同一个 Partition 数据分布在不同 Worker 上的情况,Master 的 Meta 组件会正确处理这种情形。

若发生 WorkerLost,则会导致大量 PushData 同时失败,此时会有大量同一 Partition 的 Revive 请求打到 Master。为了避免给同一个 Partition 分配过多的 Location,Master 保证仅有一个 Revive 请求真正得到处理,其余的请求塞到 pending queue 里,待 Revive 处理结束后返回同一个 Location。

Worker 宕机

当发生 WorkerLost 时,对于该 Worker 上的副本数据,Master 向其 peer 发送 CommitFile 的请求,然后清理 peer 上的 buffer。若 Commit Files 失败,则记录该 Stage 为 DataLost;若成功,则后续的 PushData 通过 Revive 机制重新申请 Location。

数据去重

Speculation task 和 task 重算会导致数据重复。解决办法是每个 PushData的数据片里编码了所属的 mapId、attemptId 和 batchId,并且 Master 为每个 map task 记录成功 commit 的 attemtpId。read 端通过 attemptId 过滤不同的 attempt 数据,并通过 batchId 过滤同一个 attempt 的重复数据。

多副本

RSS 目前支持 DFS 和 Local 两种存储后端。

在 DFS 模式下,ReadPartition 失败会直接导致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失败会触发从 peer location 读,若主从都失败则触发 Stage 重算或 abort job。

高可用

大家可以看到 RSS 的设计中 Master 是一个单点,虽然 Master 的负载很小,不会轻易地挂掉,但是这对于线上稳定性来说无疑是一个风险点。在项目的最初上线阶段,我们希望可以通过 SubCluster 的方式进行 workaround,即通过部署多套 RSS 来承载不同的业务,这样即使 RSS Master 宕机,也只会影响有限的一部分业务。但是随着系统的深入使用,我们决定直面问题,引进高可用 Master。主要的实现如下:

首先,Master 目前的元数据比较多,我们可以将一部分与 ApplD+ShuffleId 本身相关的元数据下沉到 Driver 的 ShuffleManager 中,由于元数据并不会很多,Driver 增加的内存开销非常有限。

另外,关于全局负载均衡的元数据和调度相关的元数据,我们利用 Raft 实现了 Master 组件的高可用,这样我们通过部署 3 或 5 台 Master,真正的实现了大规模可扩展的需求。

实际效果与分析性能与稳定性

团队针对 TeraSort、TPC-DS 以及大量的内部作业进行了测试,在 Reduce 阶段减少了随机读的开销,任务的稳定性和性能都有了大幅度提升。

图 3 是 TeraSort 的 benchmark,以 10T Terasort 为例,Shuffle 量压缩后大约 5.6T。可以看出该量级的作业在 RSS 场景下,由于 Shuffle read 变为顺序读,性能会有大幅提升。

图 3 TeraSort 性能测试(RSS 性能更好)

图 4 是一个线上实际脱敏后的 Shuffle heavy 大作业,之前在混部集群中很小概率可以跑完,每天任务 SLA 不能按时达成,分析原因主要是由于大量的 FetchFailed 导致 stage 进行重算。使用 RSS 之后每天可以稳定的跑完,2.1T 的 shuffle 也不会出现任何 FetchFailed 的场景。在更大的数据集性能和SLA表现都更为显著。

图 4 实际业务的作业 stage 图(使用 RSS 保障稳定性和性能)

业务效果

在大数据团队和阿里云 EMR 团队的共同努力下,经过近半年的上线、运营 RSS,以及和业务部门的长时间测试,业务价值主要体现在以下方面:

降本增效效果明显,在集群规模小幅下降的基础上,支撑了更多的计算任务,TCO 成本下降 20%。SLA 显著提升,大规模 Spark Shuffle 任务从跑不完到能跑完,我们能够将不同 SLA 级别作业合并到同一集群,减小集群节点数量,达到统一管理,缩小成本的目的。原本业务方有一部分 SLA比 较高的作业在一个独有的 Yarn 集群 B 中运行,由于主 Yarn 集群 A 的负载非常高,如果跑到集群 A 中,会经常的挂掉。利用 RSS 之后可以放心的将作业跑到主集群 A 中,从而释放掉独有 Yarn 集群 B。作业执行效率显著提升,跑的慢→跑的快。我们比较了几个典型的 Shuffle heavy 作业,一个重要的业务线作业原本需要 3 小时,RSS 版本需要 1.6 小时。抽取线上 5~10 个作业,大作业的性能提升相当明显,不同作业平均下来有 30% 以上的性能提升,即使是 shuffle 量不大的作业,由于比较稳定不需要 stage 重算,长期运行平均时间也会减少 10%-20%。架构灵活性显著提升,升级为计算与存储分离架构。Spark 在容器中运行的过程中,将 RSS 作为基础组件,可以使得 Spark 容器化能够大规模的落地,为离线在线统一资源、统一调度打下了基础。

未来展望

趣头条大数据平台和阿里云 EMR 团队后续会继续保持深入共创,将探索更多的方向。主要有以下的一些思路:

RSS 存储能力优化,包括将云的对象存储作为存储后端。RSS 多引擎支持,例如 MapReduce、Tez 等,提升历史任务执行效率。加速大数据容器化落地,配合 RSS 能力,解决 K8s 调度器性能、调度策略等一系列挑战。持续优化成本,配合 EMR 的弹性伸缩功能,一方面 Spark 可以使用更多的阿里云 ECS/ECI 抢占式实例来进一步压缩成本,另一方面将已有机器包括阿里云 ACK、ECI 等资源形成统一大池子,将大数据的计算组件和在线业务进行错峰调度以及混部。

标签: #关键字过滤 rss