前言:
而今兄弟们对“spark数据处理总结800字”都比较关注,兄弟们都需要知道一些“spark数据处理总结800字”的相关文章。那么小编在网上搜集了一些关于“spark数据处理总结800字””的相关资讯,希望大家能喜欢,兄弟们快快来了解一下吧!上一篇Project Tungsten On Spark-内存设计 总结了Spark内存设计相关的知识点,本篇会快速为读者复习一下JVM相关的知识点,然后基于线上的GC调优对spark整体的调优做一个汇总,希望能让读者在调优之路更上一层楼。
一般在我们开发spark程序的时候,从代码开发到上线以及后期的维护中,在整个过程中都需要涉及到调优的问题,即一开始需要考虑如何把代码写的更简洁高效调优(即代码优化),待开发测试完成后,提交任务时综合考量该任务所需的资源(这里涉及到资源调优),上线后是否会出现数据倾斜问题(即倾斜调优),以及是否出现频繁GC问题(这里涉及到GC调优)。
那么本篇通过反推的模式,即通过GC调优进行延伸扩展,比如出现GC问题是不是可能出现了倾斜?如果没有出现倾斜,是不是我们给的资源不足?如果资源充足的话,那么是不是我们代码写的有问题呢(比如频繁创建对象等操作)?按照这样一个思路展开来总结spark的调优。
JVM的堆、栈、方法区
如上图所示,JVM主要由类加载器系统、运行时数据区、执行引擎和本地接口等组成。
其中运行时数据区又由方法区、堆、Java栈、PC寄存器、本地方法栈组成。
当JVM加载一个class文件后,class中的参数、类型等信息会存储到方法区中,程序运行时所创建的对象存储在堆中(堆中不放基本类型和对象引用,只存放对象本身)。当每个新线程启动时,会有自己的程序计数器(Program Counter Register)和栈,当线程调用方法时,程序计数器表明下一条执行的指令,同时线程栈会存储线程的方法调用状态(包括局部变量、被调用的参数、中间结果等)。本地方法调用存储在独立的本地方法栈中,或其他独立的内存区域中。
栈区由栈桢组成,每个栈桢就是每个调用的方法的栈,当方法调用结束后,JVM会弹栈,即抛弃此方法的栈桢。
JVM内存划分
上图中的划分是基于JDK7和JDK8,其中有一些变动(主要是永久代的移除)。
JVM内存从大体上划分为三部分:年轻代、老年代、永久代(元空间)
年轻代:所有新生成的对象都会先放到年轻代,年轻代又分为三个区:Eden区、两个Survivor,三者之间的比例为8:1:1。
Eden区:大部分对象会在该区生成,当在Eden区申请空间失败后,会触发Scavenge GC,对Eden区进行GC,清除非存活对象,并把还存活的对象复制到其中一个Survivor区中。这里可能会有一个问题,由于默认情况下Eden:Survivor1:Survivor2的内存占比是8:1:1,如果存活下来的对象是1.5,一个Survivor区域放不下,那么这个时候就会利用JVM的担保机制,将多余的对象直接放入老年代,会出现老年代囤积一大堆短生命周期的,导致老年代频繁溢满,频繁进行Full GC去回收老年代中的对象
Survivor区:当Eden区满后,会把还存活的对象复制到其中一个S区中,且两个S区之间没有先后顺序关系,同时根据程序需要Survivor区是可以配置多个的,这样可以增加对象在年轻代存在的时间,减少被放到老年代的可能。JVM每次只会使用Eden和其中一块Survivor区域来为对象服务,所以无论什么时候总会有一块Survivor区域是空闲的,也就是说年轻代实际可用的内存空间为9/10的年轻代空间。
老年代:在年轻代中经历了N次GC之后仍然存活的对象,就会被放到老年代中。该区域通常存放一些生命周期较长的对象。默认情况下,年轻代和老年代的比值为1:2,即老年代占用堆空间大小的2/3,当然这个值可以通过-XX:NewRation来调整
持久代:主要存放静态文件、Java类、方法等。在Java 8中该区域已经被移除了,开始使用本地化的内存来存放类的元数据,也称之元空间
JVM GC
JVM主要管理两种类型的内存:堆和非堆,简单来说,堆就是Java代码可及的内存,是留给开发人员用的,非堆就是JVM留给自己用的。
对于Java的内存管理来说其实就是对象的管理,包括对象的分配和释放。对于GC来说,当我们创建对象的时候,GC就开始监控这个对象地址、大小以及使用情况,通常GC采用有向图的方式记录管理堆中所有对象,通过这种方式来确定哪些对象是可达的,哪些对象是不可达的。具体的GC流程如下:
当Eden满了之后,一个小型的GC就会被触发(Minor GC),Eden和Survivor1中幸存仍被使用的对象被复制到Survivor2。Survivor1和Survivor2区域进行交换,当一个对象生存的时间足够长或者Survivor2满了之后,就会被转移到Old代当Old空间快满的时候,这个时候会进行Full GC
一般以下几种情况可能会导致Full GC:
当Old空间被写满时System.GC()被显式调用上一次GC之后,Heap的各个区域分配策略动态变化
以上简单说明一下jvm相关知识点,其实spark GC的目的就是要确保老年代只保存长生命周期RDD,同时年轻代的空间又能够保存短生命周期的对象,这样就能避免启动Full GC
Spark对JVM的使用
基于上篇Tungsten on spark 文章的整理,Executor对内存的使用主要有以下几个部分:
RDD存储。当对RDD调用persist或Cache方法时,RDD的partitons会被存储到内存里,那么这块内存也就是Storage内存。Shuffle操作。当发生Shuffle时,需要缓冲区来存储Shuffle的输出和聚合的中间结果,该块内存称之为Execution内存。用户代码。用户编写的代码能够使用的内存空间,也就是其他内存(用户内存)
在统一内存模式下,整个堆空间分为Spark Memory和User Memory,其中Spark Memory包括Storage Memory和Execution Memory,而且两者之间可以互相借用空间。
通过spark.memory.fraction参数来控制Spark Memory在整个堆空间所占的比例
通过spark.memory.storageFraction来设置Storage Memory占Spark Memory的比例,如果Spark作业中有较多的RDD持久化操作,该参数值可以适当调高,保证持久化的数据能够容纳在内存中,避免内存不够缓存所有的数据,只能写入磁盘中,降低性能。如果Spark作业中Shuffle类操作比较多,持久化类操作比较少,那么可以适当降低该参数值。
这里给出一个实际的例子来说明一下spark是如何分配内存的
/usr/local/spark-current/bin/spark-submit \--master yarn \--deploy-mode client \--executor-memory 1G \--queue root.default \--class my.Application \--conf spark.ui.port=4052 \--conf spark.port.maxRetries=100 \--num-executors 2 \--jars mongo-spark-connector_2.11-2.3.1.jar \App.jar 20201118000000# 这里配置两个Executor,每个Executor内存给1G
如图所示,spark申请到了两个Executor,每个Executor得到的Storage Memory内存分别为384.1MB(注意:这里Storage Memory其实就是Storage+Execution的总和内存),这里有一个疑惑,我们分配的是每个Executor内存为1G,为什么只得到384MB呢?这里给出具体的计算公式:
我们申请为1G内存,但是真正拿到内存会比这个少,这里涉及到一个Runtime.getRuntime.maxMemory 值的计算(在上篇文章中关于UnifiedMemoryManager源码分析中提到过),Runtime.getRuntime.maxMemory对应的值才是程序能够使用的最大内存,上面也提到了堆划分了Eden,Survivor,Tenured区域,所以该值计算公式为: ExecutorMemory = Eden + 2 * Survivor + Tenured = 1GB = 1073741824 字节systemMemory = Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured = 954437176.888888888888889 字节
//org.apache.spark.memory.UnifiedMemoryManager(这里讨论的还是动态内存模型)private def getMaxMemory(conf: SparkConf): Long = { val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) val reservedMemory = conf.getLong("spark.testing.reservedMemory", if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) //这里即获取最大的内存值 (usableMemory * memoryFraction).toLong}基于Spark的动态内存模型设计,其中有300MB的预留内存,因此剩余可用内存为总申请得到的内存-预留内存reservedMemory = 300MB = 314572800字节usableMemory = systemMemory - reservedMemory = 954437176.888888888888889 - 314572800 = 639864376.888888888888889字节Spark Web UI界面上虽然显示的是Storage Memory,但其实是Execution+Storage内存,即该部分占用60%比例Storage + Execution = usableMemory * 0.6 = 639864376.888888888888889 * 0.6 = 383918626.133333333333333 字节通过第三步骤即可看出实际的内存分配情况了,注意:web ui界面得到的结果计算是除于1000转换得到的值。
GC调优步骤统计一下GC启动的频率和GC使用的总时间,即在spark-submit提交的时候设置参数即可
如图所示,这里提高了spark.memory.fraction参数值,则每个Exectuor实际可用的内存也随之增加了.
/usr/local/spark-current/bin/spark-submit \--master yarn \--deploy-mode client \--executor-memory 1G \--driver-memory 1G \--queue root.default \--class my.Application \--conf spark.ui.port=4052 \--conf spark.port.maxRetries=100 \--num-executors 2 \--jars mongo-spark-connector_2.11-2.3.1.jar \--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \--conf spark.memory.fraction=0.8 \App.jar
如图所示,出现了多次Full GC,首先考虑的是可能配置的Executor内存较低,这个时候需要增加Executor Memory来调节。
检查GC日志中是否有过于频繁的GC。如果一个任务结束前,Full GC执行多次,说明老年代空间被占满了,那么有可能是没有分配足够的内存如果有太多Minor GC,但是Full GC不多,可以给Eden分配更多的内存.
3.1.比如Eden代的内存需求量为E,可以设置Young代的内存为-Xmn=4/3*E,设置该值也会导致Survivor区域扩张
3.2.调整Eden在年轻代所占的比例,配置-XX:SurvivorRatio的比例值
调整垃圾回收器,通常使用G1GC,即配置-XX:+UseG1GC。当Executor的堆空间比较大时,可以提升G1 region size(-XX:G1HeapRegionSize)
/usr/local/spark-current/bin/spark-submit \--master yarn \--deploy-mode client \--executor-memory 1G \--driver-memory 1G \--queue root.default \--class my.Application \--conf spark.ui.port=4052 \--conf spark.port.maxRetries=100 \--num-executors 2 \--jars mongo-spark-connector_2.11-2.3.1.jar \--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=16M -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \--conf spark.memory.fraction=0.8 \App.jar优化代码,尽量多使用array和string,并使用kyro序列,让每个Partition都成为字节数组结合实际的需求,调整缓存和shuffle计算所占的内存比例,即当代码中出现shuffle类操作比较多,而不需要太多缓存的话,则可以适当降低Storage Memory所占比例;当缓存操作比较多,而Shuffle类操作比较少的话,可以适当调低Execution Memory所占比例。主要是通过spark.storage.storageFraction来控制开启堆外内存,设置堆外内存大小,这里为了避免OOM
spark.memory.offHeap.size=4Gspark.memory.offHeap.enabled=true
注意:这里需要说明一下spark.executor.memoryOverhead 和spark.memory.offHeap.size之间的区别
spark.executor.memoryOverhead是属于JVM堆外内存,用于JVM自身的开销、内部的字符串还有一些本地开销,spark不会对这块内存进行管理。默认大小为ExecutorMemory的10%,在spark2.4.5之前,该参数的值应该包含spark.memory.offHeap.size的值。比如spark.memory.offHeap.size配置500M,spark.executor.memoryOverhead默认为384M,那么memoryOverhead的值应该为884M。
//spark2.4.5之前的// Executor memory in MB.protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt// Additional memory overhead.protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toIntprotected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) { sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)} else { 0}// Resource capability requested for each executorsprivate[yarn] val resource = Resource.newInstance( executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)//由于memoryOverHead的参数值理解起来比较困难,而且不易于用户对每个特定的内存区域进行自定义配置,所以在Spark3.0之后进行了拆分//spark3.0之后的资源申请更改为private[yarn] val resource: Resource = { val resource = Resource.newInstance( executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores) ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource) logDebug(s"Created resource capability: $resource") resource }
spark.memory.offHeap.size这个参数指定的内存(广义上是指所有堆外的),这部分内存的申请和释放是直接进行的,不由JVM管理,所以这块是没有GC的。
倾斜调优
倾斜部分的调优可以阅读下面两篇文章,相对来说已经比较全了
Spark数据倾斜之骚操作解决方案
数据开发必经之路-数据倾斜
开发调优
相信有很多读者应该非常熟悉以下这几种使用姿势了,这里就不再重复详细说明了
避免创建重复的RDD尽可能复用同一个RDD对多次使用的RDD进行持久化尽量避免使用Shuffle算子使用map-side预聚合的shuffle操作使用高性能的算子6.1: 使用reduceByKey/aggregateByKey替代groupByKey6.2: 使用mapPartitions替代普通map6.3: 使用foreachPartitions替代foreach6.4: 使用filter之后进行coalesce操作6.5: 使用repartitionAndSortWithinPartitions替代repartition与sort类操作广播大变量
val list1 = ...val list1Broadcast = sc.broadcast(list1)rdd1.map(list1Broadcast...)使用kryo优化序列化性能
// 创建SparkConf对象conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 设置序列化器为KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))优化数据结构,尽量使用字符串代替对象,使用原始类型(如int,Long)代替字符串,使用数组代替集合类型资源参数调优
众所周知,引起GC主要是内存资源问题,一般情况下是不需要对GC进行调优的。当出现GC问题时,那么就需要思考是哪个环节造成内存紧张。首先想到的应该是配置的内存不足,直接加资源,这里整理了一些配置参数,仅供读者参考。
因排版问题且涉及参数较多,这里涉及到的参数大家可关注公众号“进击吧大数据”进行阅读了解
标签: #spark数据处理总结800字