增加Shuffle调优之合并map端输出文件

main
Oeljeklaus 7 years ago committed by GitHub
parent f13c7a3a00
commit 8a49bdad2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -518,6 +518,93 @@ spark.yarn.executor.memoryOverhead参数针对的的是yarn的提交方式。
通过这个参数调节以后,可以避免以上问题的避免。
对于等待时间就是在出现上述错误的时候连接不上拉去数据的Block Manager就会出现这个问题我们需要在spark-submit脚本中配置等待时长默认是60秒。
### Shuffle调优之shuffle原理
**什么样的情况下会发生shuffle?**
在spark中主要是以下几个算子groupByKey、reduceByKey、countByKey、join等等。
**什么是shuffle?**
groupByKey要把分布在集群各个节点上的数据中的同一个key对应的values都给集中到一块儿集中到集群中同一个节点上更严密一点说就是集中到一个节点的一个executor的一个task中。
然后呢集中一个key对应的values之后才能交给我们来进行处理<key, Iterable<value>>reduceByKey算子函数去对values集合进行reduce操作最后变成一个valuecountByKey需要在一个task中获取到一个key对应的所有的value然后进行计数统计总共有多少个valuejoinRDD<key, value>
RDD<key, value>只要是两个RDD中key相同对应的2个value都能到一个节点的executor的task中给我们进行处理。
**shuffle示意图**
![shuffle示意图](shuffle示意图.png)
1.每一个shuffle的前半部分stage的task每个task都会创建下一个stage的task数量相同的文件比如下一个stage会有100个task那么当前stage每个task都会创建100份文件会将同一个key对应的values一定是写入同一个文件中的不同节点上的task也一定会将同一个key对应的values写入下一个stage同一个task对应的文件中。
shuffle的后半部分stage的task每个task都会从各个节点上的task写的属于自己的那一份文件中拉取key, value对然后task会有一个内存缓冲区然后会用HashMap进行key, values的汇聚(key ,values)
task会用我们自己定义的聚合函数比如reduceByKey(_+_)把所有values进行一对一的累加聚合出来最终的值。就完成了shuffle
2.shuffle一定是分为两个stage来完成的。因为这其实是个逆向的过程不是stage决定shuffle是shuffle决定stage。
reduceByKey(_+_)在某个action触发job的时候DAGScheduler会负责划分job为多个stage。划分的依据就是如果发现有会触发shuffle操作的算子比如reduceByKey就将这个操作的前半部分以及之前所有的RDD和transformation操作划分为一个stageshuffle操作的后半部分以及后面的直到action为止的RDD和transformation操作划分为另外一个stage。
3.shuffle前半部分的task在写入数据到磁盘文件之前都会先写入一个一个的内存缓冲内存缓冲满溢之后再spill溢写到磁盘文件中。
### Shuffle调优之合并map端输出文件
**如果不合并map端输出文件会怎么样?**
前置条件每个executor有2个cpu core。4个task。task是线程执行的。所以先并行跑2个task再跑剩下2个task。
![合并map端输出文件](合并map端输出文件.png)
问题来了默认的这种shuffle行为对性能有什么样的恶劣影响呢
实际生产环境的条件:
100个节点每个节点一个executor100个executor
每个executor2个cpu core
总共1000个task每个executor平均10个task
每个节点10个task每个节点会输出多少份map端文件10 * 1000=1万个文件
总共有多少份map端输出文件100 * 10000 = 100万。
shuffle中的写磁盘的操作基本上就是shuffle中性能消耗最为严重的部分。
通过上面的分析一个普通的生产环境的spark job的一个shuffle环节会写入磁盘100万个文件。
磁盘IO对性能和spark作业执行速度的影响是极其惊人和吓人的。
基本上spark作业的性能都消耗在shuffle中了虽然不只是shuffle的map端输出文件这一个部分但是这里也是非常大的一个性能消耗点。
**开启map端输出文件合并**
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
开启shuffle map端输出文件合并的机制默认情况下是不开启的就是会发生如上所述的大量map端输出文件的操作严重影响性能。
![map端文件合并](map端文件合并.png)
开启了map端输出文件的合并机制之后
第一个stage同时就运行cpu core个task比如cpu core是2个并行运行2个task每个task都创建下一个stage的task数量个文件
第一个stage并行运行的2个task执行完以后就会执行另外两个task另外2个task不会再重新创建输出文件而是复用之前的task创建的map端输出文件将数据写入上一批task的输出文件中。
第二个stagetask在拉取数据的时候就不会去拉取上一个stage每一个task为自己创建的那份输出文件了而是拉取少量的输出文件每个输出文件中可能包含了多个task给自己的map端输出。
提醒一下map端输出文件合并
只有并行执行的task会去创建新的输出文件下一批并行执行的task就会去复用之前已有的输出文件但是有一个例外比如2个task并行在执行但是此时又启动要执行2个task那么这个时候的话就无法去复用刚才的2个task创建的输出文件了而是还是只能去创建新的输出文件。
要实现输出文件的合并的效果必须是一批task先执行然后下一批task再执行才能复用之前的输出文件负责多批task同时起来执行还是做不到复用的。
开启了map端输出文件合并机制之后生产环境上的例子会有什么样的变化
实际生产环境的条件:
100个节点每个节点一个executor100个executor
每个executor2个cpu core
总共1000个task每个executor平均10个task
每个节点2个cpu core有多少份输出文件呢2 * 1000 = 2000个
总共100个节点总共创建多少份输出文件呢100 * 2000 = 20万个文件
相比较开启合并机制之前的情况100万个
map端输出文件在生产环境中立减5倍
合并map端输出文件对咱们的spark的性能有哪些方面的影响呢
1、map task写入磁盘文件的IO减少100万文件 -> 20万文件
2、第二个stage原本要拉取第一个stage的task数量份文件1000个task第二个stage的每个task都要拉取1000份文件走网络传输合并以后100个节点每个节点2个cpu core第二个stage的每个task主要拉取100 * 2 = 200个文件即可网络传输的性能消耗是不是也大大减少
分享一下实际在生产环境中使用了spark.shuffle.consolidateFiles机制以后实际的性能调优的效果对于上述的这种生产环境的配置性能的提升还是相当的客观的。spark作业5个小时 -> 2~3个小时。
大家不要小看这个map端输出文件合并机制。实际上在数据量比较大你自己本身做了前面的性能调优executor上去->cpu core上去->并行度task数量上去shuffle没调优shuffle就很糟糕了大量的map端输出文件的产生。对性能有比较恶劣的影响。
这个时候,去开启这个机制,可以很有效的提升性能。
## troubleshooting
### troubleshooting之控制shuffle reduce端缓冲大小避免OOM
shuffle过程中优map端的task是不断的输出数据的数据量可能是很大的但是其实reduce端的task并不是等到map端task将属于自己的那份数据全部
@ -580,34 +667,6 @@ spark.shuffle.io.retryWait 5s
3.可以使用压缩算子提前性能。
## 数据倾斜
### shuffle原理
**什么样的情况下会发生shuffle?**
在spark中主要是以下几个算子groupByKey、reduceByKey、countByKey、join等等。
**什么是shuffle?**
groupByKey要把分布在集群各个节点上的数据中的同一个key对应的values都给集中到一块儿集中到集群中同一个节点上更严密一点说就是集中到一个节点的一个executor的一个task中。
然后呢集中一个key对应的values之后才能交给我们来进行处理<key, Iterable<value>>reduceByKey算子函数去对values集合进行reduce操作最后变成一个valuecountByKey需要在一个task中获取到一个key对应的所有的value然后进行计数统计总共有多少个valuejoinRDD<key, value>
RDD<key, value>只要是两个RDD中key相同对应的2个value都能到一个节点的executor的task中给我们进行处理。
**shuffle示意图**
![shuffle示意图](shuffle示意图.png)
1.每一个shuffle的前半部分stage的task每个task都会创建下一个stage的task数量相同的文件比如下一个stage会有100个task那么当前stage每个task都会创建100份文件会将同一个key对应的values一定是写入同一个文件中的不同节点上的task也一定会将同一个key对应的values写入下一个stage同一个task对应的文件中。
shuffle的后半部分stage的task每个task都会从各个节点上的task写的属于自己的那一份文件中拉取key, value对然后task会有一个内存缓冲区然后会用HashMap进行key, values的汇聚(key ,values)
task会用我们自己定义的聚合函数比如reduceByKey(_+_)把所有values进行一对一的累加聚合出来最终的值。就完成了shuffle
2.shuffle一定是分为两个stage来完成的。因为这其实是个逆向的过程不是stage决定shuffle是shuffle决定stage。
reduceByKey(_+_)在某个action触发job的时候DAGScheduler会负责划分job为多个stage。划分的依据就是如果发现有会触发shuffle操作的算子比如reduceByKey就将这个操作的前半部分以及之前所有的RDD和transformation操作划分为一个stageshuffle操作的后半部分以及后面的直到action为止的RDD和transformation操作划分为另外一个stage。
3.shuffle前半部分的task在写入数据到磁盘文件之前都会先写入一个一个的内存缓冲内存缓冲满溢之后再spill溢写到磁盘文件中。
### 数据倾斜解决方案之原理以及现象分析
**1.数据倾斜的原理**

Loading…
Cancel
Save