diff --git a/README.md b/README.md index 15a014f..4a02f74 100644 --- a/README.md +++ b/README.md @@ -518,6 +518,93 @@ spark.yarn.executor.memoryOverhead参数,针对的的是yarn的提交方式。 通过这个参数调节以后,可以避免以上问题的避免。 对于等待时间,就是在出现上述错误的时候,连接不上拉去数据的Block Manager,就会出现这个问题,我们需要在spark-submit脚本中配置等待时长,默认是60秒。 +### Shuffle调优之shuffle原理 +**什么样的情况下,会发生shuffle?** + +在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。 + +**什么是shuffle?** + +groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。 + +然后呢,集中一个key对应的values之后,才能交给我们来进行处理,>;reduceByKey,算子函数去对values集合进行reduce操作,最后变成一个value;countByKey,需要在一个task中,获取到一个key对应的所有的value,然后进行计数,统计总共有多少个value;join,RDD, +RDD,只要是两个RDD中,key相同对应的2个value,都能到一个节点的executor的task中,给我们进行处理。 + +**shuffle示意图** +![shuffle示意图](shuffle示意图.png) + +1.每一个shuffle的前半部分stage的task,每个task都会创建下一个stage的task数量相同的文件,比如下一个stage会有100个task,那么当前stage每个task都会创建100份文件;会将同一个key对应的values,一定是写入同一个文件中的;不同节点上的task,也一定会将同一个key对应的values,写入下一个stage,同一个task对应的文件中。 + +shuffle的后半部分stage的task,每个task都会从各个节点上的task写的属于自己的那一份文件中,拉取key, value对;然后task会有一个内存缓冲区,然后会用HashMap,进行key, values的汇聚;(key ,values); + +task会用我们自己定义的聚合函数,比如reduceByKey(_+_),把所有values进行一对一的累加;聚合出来最终的值。就完成了shuffle + +2.shuffle,一定是分为两个stage来完成的。因为这其实是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。 + +reduceByKey(_+_),在某个action触发job的时候,DAGScheduler,会负责划分job为多个stage。划分的依据,就是,如果发现有会触发shuffle操作的算子,比如reduceByKey,就将这个操作的前半部分,以及之前所有的RDD和transformation操作,划分为一个stage;shuffle操作的后半部分,以及后面的,直到action为止的RDD和transformation操作,划分为另外一个stage。 + +3.shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲满溢之后,再spill溢写到磁盘文件中。 +### Shuffle调优之合并map端输出文件 +**如果不合并map端输出文件会怎么样?** +前置条件:每个executor有2个cpu core。4个task。task是线程执行的。所以先并行跑2个task,再跑剩下2个task。 +![合并map端输出文件](合并map端输出文件.png) +问题来了:默认的这种shuffle行为,对性能有什么样的恶劣影响呢? +实际生产环境的条件: +100个节点(每个节点一个executor):100个executor +每个executor:2个cpu core +总共1000个task:每个executor平均10个task + +每个节点,10个task,每个节点会输出多少份map端文件?10 * 1000=1万个文件 + +总共有多少份map端输出文件?100 * 10000 = 100万。 + +shuffle中的写磁盘的操作,基本上就是shuffle中性能消耗最为严重的部分。 + +通过上面的分析,一个普通的生产环境的spark job的一个shuffle环节,会写入磁盘100万个文件。 + +磁盘IO对性能和spark作业执行速度的影响,是极其惊人和吓人的。 + +基本上,spark作业的性能,都消耗在shuffle中了,虽然不只是shuffle的map端输出文件这一个部分,但是这里也是非常大的一个性能消耗点。 +**开启map端输出文件合并** +new SparkConf().set("spark.shuffle.consolidateFiles", "true") + +开启shuffle map端输出文件合并的机制;默认情况下,是不开启的,就是会发生如上所述的大量map端输出文件的操作,严重影响性能。 +![map端文件合并](map端文件合并.png) +开启了map端输出文件的合并机制之后: + +第一个stage,同时就运行cpu core个task,比如cpu core是2个,并行运行2个task;每个task都创建下一个stage的task数量个文件; + +第一个stage,并行运行的2个task执行完以后;就会执行另外两个task;另外2个task不会再重新创建输出文件;而是复用之前的task创建的map端输出文件,将数据写入上一批task的输出文件中。 + +第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每一个task为自己创建的那份输出文件了;而是拉取少量的输出文件,每个输出文件中,可能包含了多个task给自己的map端输出。 +提醒一下(map端输出文件合并): + +只有并行执行的task会去创建新的输出文件;下一批并行执行的task,就会去复用之前已有的输出文件;但是有一个例外,比如2个task并行在执行,但是此时又启动要执行2个task;那么这个时候的话,就无法去复用刚才的2个task创建的输出文件了;而是还是只能去创建新的输出文件。 + +要实现输出文件的合并的效果,必须是一批task先执行,然后下一批task再执行,才能复用之前的输出文件;负责多批task同时起来执行,还是做不到复用的。 +开启了map端输出文件合并机制之后,生产环境上的例子,会有什么样的变化? + +实际生产环境的条件: +100个节点(每个节点一个executor):100个executor +每个executor:2个cpu core +总共1000个task:每个executor平均10个task + +每个节点,2个cpu core,有多少份输出文件呢?2 * 1000 = 2000个 +总共100个节点,总共创建多少份输出文件呢?100 * 2000 = 20万个文件 + +相比较开启合并机制之前的情况,100万个 + +map端输出文件,在生产环境中,立减5倍! +合并map端输出文件,对咱们的spark的性能有哪些方面的影响呢? + +1、map task写入磁盘文件的IO,减少:100万文件 -> 20万文件 +2、第二个stage,原本要拉取第一个stage的task数量份文件,1000个task,第二个stage的每个task,都要拉取1000份文件,走网络传输;合并以后,100个节点,每个节点2个cpu core,第二个stage的每个task,主要拉取100 * 2 = 200个文件即可;网络传输的性能消耗是不是也大大减少 + +分享一下,实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果:对于上述的这种生产环境的配置,性能的提升,还是相当的客观的。spark作业,5个小时 -> 2~3个小时。 + +大家不要小看这个map端输出文件合并机制。实际上,在数据量比较大,你自己本身做了前面的性能调优,executor上去->cpu core上去->并行度(task数量)上去,shuffle没调优,shuffle就很糟糕了;大量的map端输出文件的产生。对性能有比较恶劣的影响。 + +这个时候,去开启这个机制,可以很有效的提升性能。 ## troubleshooting ### troubleshooting之控制shuffle reduce端缓冲大小避免OOM shuffle过程中优map端的task是不断的输出数据的,数据量可能是很大的,但是,其实reduce端的task,并不是等到map端task将属于自己的那份数据全部 @@ -580,34 +667,6 @@ spark.shuffle.io.retryWait 5s 3.可以使用压缩算子提前性能。 ## 数据倾斜 -### shuffle原理 -**什么样的情况下,会发生shuffle?** - -在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。 - -**什么是shuffle?** - -groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。 - -然后呢,集中一个key对应的values之后,才能交给我们来进行处理,>;reduceByKey,算子函数去对values集合进行reduce操作,最后变成一个value;countByKey,需要在一个task中,获取到一个key对应的所有的value,然后进行计数,统计总共有多少个value;join,RDD, -RDD,只要是两个RDD中,key相同对应的2个value,都能到一个节点的executor的task中,给我们进行处理。 - -**shuffle示意图** -![shuffle示意图](shuffle示意图.png) - -1.每一个shuffle的前半部分stage的task,每个task都会创建下一个stage的task数量相同的文件,比如下一个stage会有100个task,那么当前stage每个task都会创建100份文件;会将同一个key对应的values,一定是写入同一个文件中的;不同节点上的task,也一定会将同一个key对应的values,写入下一个stage,同一个task对应的文件中。 - -shuffle的后半部分stage的task,每个task都会从各个节点上的task写的属于自己的那一份文件中,拉取key, value对;然后task会有一个内存缓冲区,然后会用HashMap,进行key, values的汇聚;(key ,values); - -task会用我们自己定义的聚合函数,比如reduceByKey(_+_),把所有values进行一对一的累加;聚合出来最终的值。就完成了shuffle - -2.shuffle,一定是分为两个stage来完成的。因为这其实是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。 - -reduceByKey(_+_),在某个action触发job的时候,DAGScheduler,会负责划分job为多个stage。划分的依据,就是,如果发现有会触发shuffle操作的算子,比如reduceByKey,就将这个操作的前半部分,以及之前所有的RDD和transformation操作,划分为一个stage;shuffle操作的后半部分,以及后面的,直到action为止的RDD和transformation操作,划分为另外一个stage。 - -3.shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲满溢之后,再spill溢写到磁盘文件中。 - - ### 数据倾斜解决方案之原理以及现象分析 **1.数据倾斜的原理**