龙空技术网

ClickHouse在京东流量分析的应用实践

闪念基因 3234

前言:

此时我们对“spark vs clickhouse”大约比较关心,咱们都想要了解一些“spark vs clickhouse”的相关资讯。那么小编在网上收集了一些对于“spark vs clickhouse””的相关内容,希望我们能喜欢,朋友们一起来了解一下吧!

前言

ClickHouse 是一款开源列式存储的分析型数据库,相较业界OLAP数据库系统,其最核心优势就是极致的查询性能。它实现了向量化执行和SIMD指令,对内存中的列式数据,一个batch调用一次SIMD指令,大幅缩短了计算耗时,带来数倍的性能提升。目前国内社区火热,各大厂也纷纷进入该技术领域的探索。

引言

本文主要讨论京东黄金眼业务在离线流量数据分析上,如何利用ClickHouse进行探索和实践。近些年,海量数据联机分析的热度与日俱增,从分布式Mysql、Presto、Impala大规模离线、实时计算遇到瓶颈,再到Apache Druid、Apache Doris、ClickHouse等实时分析型数据库的广泛应用,离线、实时计算引擎百花齐放。但在应对差异性业务场景中的问题解决中,没有哪一种引擎是万能的。本文谨以京东黄金眼业务在离线、实时分析的应用实践,希望能够给到大家一些启发,也欢迎大家多多交流,给我们提出宝贵的建议。

黄金眼业务形态

京东集团使命——“技术为本,致力于更高效和可持续的世界”。黄金眼作为集团内经营数据分析的重要入口,为众多采销与运营提供分析型数据看板,向其制定经营策略、复盘经营成果提供了重要数据支撑。但集团数以万计的采销岗位变化、组织架构变化、品牌、品类等属性变化,会导致数据库中历史的离线岗位、属性信息不准确;商家和自营sku关联信息需要实时更新,每日更新数据量千亿以上,诸如此类因素会对推数和查询产生巨大影响。这意味着,高效稳定的推数工具和查询架构对数据看板的准确、高效显得尤为重要。

Easy OLAP 设计

1、 EasyOLAP ClickHouse 数据链路

Easy OLAP 数据链路

Easy OLAP 数据源主要为实时JDQ(kafka topic)、离线Hive数据。依赖ClickHouse官方 JDBC ,实时数据支持通过Flink 导入,离线数据主要使用Spark job 导入。

2、 EasyOLAP ClickHouse 全链路监控

Easy OLAP ClickHouse 监控链路

目前Easy OLAP ClickHouse 项目,主要依赖node_exporter 采集暴露机器层面指标和Clickhouse Prometheus port 暴露ClickHouse 服务层面指标,Prometheus 采集,时序数据库存储。

值得一提的是,Grafana支持直接链接ClickHouse server查询,我们为ClickHouse 定制了很多依赖system库下的表而建立了很多全局的分布式表,供Grafana展示。

3、 Easy OLAP ClickHouse HA

ClickHouse HA

概念:

Cluster: 单个clickhouse 物理集群

Group: 单个 clickhouse 逻辑组,在clickhouse group内实现数据备份,集群节点扩缩容Group 为最小单位,实现最小粒度的数据迁移工作

Node: 单台物理机器

Instance: 单个ClickHouse Server 进程

shard_replica: 图中 0_0、0_1、0_2 .. 表示相同shard内不同replica

设计亮点:

1. 设计逻辑Group概念,在单个Group内副本达到高可用。

2. 依赖ClickHouse storage_policy 设计单台机器多实例、多磁盘部署。

3. Group 为集群扩缩容最小单元,集群在扩缩容情况下,可达到最小数据迁移甚至不迁移的情况。

4. 可满足单个SQL使用集群所有资源。

ClickHouse推数工具

目前HDFS to ClickHouse业内主要使用Waterdrop,但存在环境依赖、配置繁琐、无法聚合等问题,为了满足敏捷开发快速接入的要求,我们用Spark编写了ClickHouse自动推数工具,只需配置CK数据源信息、表名、分区、索引即可完成Buffalo(调度)推数,如需裁剪数据、更改结构、转换数据、聚合数据增加参数即席发布,ClickHouse推数程序主要包含三部分内容:

1、 集群检查

元数据补全:推数前检测CK所有节点上system.tables是否已存在表、表结构是否一致,如不存在,根据hive表schema信息,字段类型自动映射成CK表类型,支持自定义分区、表字段函数生成新的DataFrame,on Cluster方式创建CK本地表和分布式表。

集群负载:调用集群接口或者直接访问集群物理机,获取集群中是否存在cpu低于80%、内存低于70%、负载低于峰值60%副本集合,如存在,则按资源等级动态降低并发,直至暂停推数,以保障推数期间ck集群稳定使用。

删除数据检查:因CK删除分区存在一定延时性,超过1G分区先detachPartition再在空闲时段dropDetachPartition,低于1G的小分区,删除后通过count计算分布式表查看数据是否已删除完成。

元数据更新:CK更新本地表,根据用户新加字段类型、修改字段类型、删除字段类型,执行相应DDL SQL on cluster,完成更新,分布式表自动删除并映射新的本地表创建分布式表。

2、 数据传输

数据切分:有2种方式,一种离线数据生成一个uuid,hash_key_for_shard可以直接hash这个字段,这种会有一个影响,sql聚合的数据分布在不同的节点,增加IO开销。另一种coalesce(cast(abs(hash(A,B ,离散度高的字段)) as int) %集群分片数,0),A B为索引字段,当索引字段的离散度不高时,增加一个或者多个离散度高的字段,保证所有节点的数据是均衡的,同时命中索引的情况下,数据在一个节点,减少网络开销,提升查询效率。

并发推数:推数前由于hive可能是多分区的情况下,需要重新分区并合并小文件,减少多批次小文件导数sqlContext.sql(sql_inserttable.stripMargin).toDF().repartition(5).saveToClickHouse(dbName = targetdatabase, tableName = s"${targettable}_local", index = Integer.parseInt(shardnum))并发数取集群分片数的1/4,写入线程数"spark.dynamicAllocation.maxExecutors", "4"。这样5个推数任务,同时推数的情况下:任务数5*并发分片数10*写入线程数4/集群分片数40<=15,每个节点同时写入的线程低于15对查询影响较小,多副本的情况下,动态选择负载小的副本集合推数。

异常处理:在hive中正确的数据,ck中可能无法导入,捕获异常数据的同时,打印出来并进行计数,上游优化数据或者舍弃ck无法导入的数据,Counter值,在最后的验证环节提供数据支撑,保证源数据量=ck中数据量+异常数据量。

3、 数据验证

数据量检查:推入ck的数据量=源数据量-异常数据量

指标合理值检查:选择需要验证的指标,uv、pv等指标除双11、618期间,增长一般不超过大促峰值,超过阈值告警,上游需检查数据是否正确。

空值检查:当数据量为0推数任务报错,多字段为空时任务告警离线确认。

基于Spark开发的CK推数程序,每日新增数据超过数千亿行,峰值导入每分钟数亿行,副本同步一分钟以内,重要数据,多个副本同时推,保证各副本数据一致性。统一服务接入多个应用并提供查询优化、结果合并、安全限流等保障,目前按日多维查询秒级内,查询qps达到30+,服务稳定,0级业务提供独立保障。

ClickHouse查询架构

1、聚合物化视图

异步物化视图,在原有的分布式表和本地表中把需要去重的字段类型更改为A AggregateFunction(uniq, String),uniq还可以设置为uniqCombined,uniqCombined64,uniqHLL12,uniqExact。根据业务需要选择不同的精度,聚合数据后insert的时候uniqState(A),查询时使用uniqMerge(A),从明细到聚合物化视图,大约可以减少70%的数据量,提高查询性能,进一步提高聚合程度的话,可以设置单指标表,查询时join。

2、字典函数

字典函数更新时可并发更新,on cluster根据集群配置情况使用,数千万的数据量,采用HASHED()格式,大约需要占用数十G的集群内存,COMPLEX_KEY_HASHED()占用的内存更多, select dictGetString('db.dict_dept','dept',tuple('1')) AS id,dictGetString(' db.dict_id_name','name',tuple(id)) AS name,字典查询结果可以作为参数传入下次字典查询。

3、ck刷岗

京东集团内数万的采销岗位、组织架构变化,品牌、品类等属性变化,数据库中历史的离线岗位、属性信息不准确,商家和自营Sku关联信息需要实时关联更新,我们采用在ck中明细数据刷岗的方案,流量明细表根据业务需求先进行预聚合,每天大约可以减少70%的数据量,刷岗信息将维表更新到字典函数里,实时加载到内存,以提高刷岗效率,每天数据刷岗时间3分钟,每日可更新数据周期6个月,无需查询时关联,单次查询性能大约提高30%,高并发情况下,大幅提高qps。

4、 多活集群Clickhouse vs Doris

ClickHouse现在的瓶颈主要在并发上不去,那么对于重要业务,可以选择多副本和多活集群,多副本集群多个节点挂会导致业务不可用,我们启动了多活集群的方案,一份数据推多个集群,统一服务查询时,根据集群配置分配不同比例下发查询到各个集群,大小查询分开,把查点的和查列表的分开,查一天和查一个月的分开,提升服务整体QPS。

统一服务根据各CK集群的负载能力,分发不同比例的查询请求,每个集群设置独立线程池,保证在瞬时高并发的情况,指标熔断、查询熔断保护CK集群,以保证业务的持续稳定运行,集群中目前主要有3种表形式:离线规范化的明细表、离线聚合的预计算表,CK中进行聚合的物化视图,还有字典表、系统表等其他工具表,在多活集群的情况下,集群版本存在差异、多运维团队的情况下。我们建立了三层缓存模型:SQL缓存、接口请求参数缓存、SQL模型缓存,缓存结果存储在JIMDB(分布式Redis)。在缓存都未命中的情况下,下发请求到CK数据源,大促备战过程中,遍历CK中system.querys中的慢查询,更改时间参数缓存模型,遍历常用部门大查询提前缓存查询结果来保证服务在瞬时高并发的情况下,稳定输出数据到实时明细、战报、JDV大屏等0级应用。

技术取舍

离线数据处理的技术实现,需要在高吞吐、高并发、准确性和成本之间进行取舍。我们采用ClickHouse作为离线仓库的聚合层,其实也是在多方面进行了平衡。例如:

1、ck推数工具的导入任务,不同分片启动独立的spark程序,启动时间间隔为各副本同步数据提供资源,每个程序启动前检查集群节点cpu、内存、和负载, 资源紧张时,低优先级的任务暂停推数,以此为查询和导数之间达到一个平衡;

2、为了降低开发成本,节省计算资源,我们通过建立单指标聚合表(类似于doris的rollup)来支持快速的查询需求,但是却增加了存储压力以及写入时的IO压力;

3、PV、UV等流量指标在聚合时采用的是HLL计算,降低了局部精度,查询响应更为高效;

以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对离线数据大规模、高吞吐、逻辑复杂等特点,离线计算的选型,最终考虑的就是如何取舍。

ClickHouse在大促期间的优化

上文提到我们在ClickHouse中建立了不同粒度的聚合模型,包括PV粒度、UV粒度、有效用户粒度、成交用户粒度。11.11零点迎接流量峰值挑战,战报、实时明细、JDV大屏瞬时qps上千,触发熔断uv指标降级。服务侧每个集群线程池根据集群压测能力正在运行的sql数设置为60、80、120,集群侧sql执行时长30s超时查杀,集群cpu超过90%触发熔断,停止推数等操作保护集群。

索引优化,提取system.query_log慢查询top200优化索引,遵循过滤的字段尽可能放到前面的原则,提升查询效率。缓存前置,将bu和一、二级部门的访问请求模拟请求参数。

提前缓存结果,遍历bu、一级部门、二级部门查询,请求数十万次,这块可以考虑离线进行预计算,本次大促期间全部使用OLAP是为了减少刷岗的请求。

HLL的近似度在目前PV和UV的基数下,实际情况误差在0.15%左右,符合预期。流量明细、用户明细,每日新增千亿条数据,在数据查询阶段,我们主要针对集群参数做了调优在高并发大查询和集群稳定之间进行平衡,最终达到压测的要求。

总结与展望

京东黄金眼业务应用clickhouse替换presto的聚合计算,既可以提高开发效率,适应多维度变化,同时也可以降低计算资源,用clickhouse充当离线数据仓库的聚合层,并提供统一的接口服务,保证了数据的一致性和安全性。

我们在使用中也遇到了内存溢出、节点替换元数据不一致、下线故障节点无法进行ddl操作、集群有节点掉盘等问题。针对以上问题,EasyOLAP团队采取如下方案:优化集群配额、增加自动化监控节点、更新节点、下线节点等工具。

我们计划使用bitmap功能来支持UV等指标的精准去重操作,提升qps到50+。除此之外,为不断提高应用的流畅性与准确性,我们持续做出以下提升:指标拆分建表预聚合降低数据量提高并发,大小查询线程池分离提高用户体验,下载查询分离减少耦合,限流降速保护集群。

作者:陈洪健、李海波、宋恩杰、屠志强

来源:微信公众号:京东零售技术

出处:

标签: #spark vs clickhouse