You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
javac/README.md

50 KiB

电商用户行为分析大数据平台

项目介绍

1.基于Spark开发的平台

2.需要有spark基础

3.有很多高级知识和设计模式

4.电商用户行为分析大数据平台(项目名称)

5.访问行为,购物行为,广告点击行为,对这些行为进行分析,使用大数据技术来帮助公司提升业绩。

6.主要的功能模块有用户session分析页面单跳转化率统计热门商品离线统计广告流量实时统计等4个业务模块。

7.所使用的知识点是spark corespark SQLspark streaming等三个技术框架。

8.主要是数据倾斜线上故障性能调优troubleshooting等经验。

9.使用模拟数据,希望达到的效果。

10.需求分析,方案设计,数据设计,编码实现,测试以及性能调优等环节。

模块简介

1、用户访问session分析该模块主要是对用户访问session进行统计分析包括session的聚合指标计算、按时间比例随机抽取session、获取每天点击、下单和购买排名前10的品类、并获取top10品类的点击量排名前10的session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标从而对公司的产品设计以及业务发展战略做出调整。主要使用Spark Core实现。

2、页面单跳转化率统计该模块主要是计算关键页面之间的单步跳转转化率涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率从而对网页布局进行更好的优化设计。主要使用Spark Core实现。

3、热门商品离线统计该模块主要实现每天统计出各个区域的top3热门商品。然后使用Oozie进行离线统计任务的定时调度使用Zeppeline进行数据可视化的报表展示。该模块可以让企业管理层看到公司售卖的商品的整体情况从而对公司的商品相关的战略进行调整。主要使用Spark SQL实现。

4、广告流量实时统计该模块负责实时统计公司的广告流量包括广告展现流量和广告点击流量。实现动态黑名单机制以及黑名单过滤实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计实现每个区域每个广告的点击流量实时统计实现每个区域top3点击量的广告的统计。主要使用Spark Streaming实现。

用户访问Session分析模块

用户session功能介绍

1.对用户访问session进行分析可以根据使用者指定的某些条件筛选出指定的一些用户对用户在指定日期范围内发起session进行聚合统计比如统计出访问时长在某段时间的session占总session数量的比例按时间比例。

2.JDBC辅助类封装

3.获取点击量下单量和支付量都排名10的商品种类

4.获取top10的商品种类的点击数量排名前10的session

5.开发完毕了以上功能以后,需要进行大量,复杂,高端,全套的性能调优。

6.10亿数据量的troubleshooting的经验总结

7.数据倾斜的完美解决方案,数据倾斜往往是大数据处理程序的性能杀手。

8.把使用moc的数据对模块进行处理调试。

实际架构

1.javaee平台接受到执行统计分析任务的请求之后会调用封装了spark-submit的shell脚本执行。

2.spark作业获取指定参数然后运行复杂的作业逻辑进行该模块的统计和分析。

3.spark作业统计和分析的结果会写入mySQL中指定的表。

4.最后通过Javaee进行结果展示。

用户访问session介绍

1.用户在电商网站上,通常会有很多点击行为,首页通常都是进入首页,然后可能点击首页上的一些商品,点击首页上的一些品类,也可能随时在搜索框里面搜索关键词,还可能将一些商品加入购物车,对购物车中的多个商品下订单,最后对订单中的多个商品进行支付。

2.用户的每一次操作可以理解为一个action比如支付。

3.用户session指的是用户第一次进入首页session就开始了。然后在一定时间范围内直到最后结束离开网站关闭浏览器或者长时间没有操作那么session就结束了

4.以上用户在网站内的访问过程就称为一次session。简单的说session就是某一天时间内某个用户对网站从打开或者进入到做了大量操作关闭浏览器的过程。就叫做session。

数据结构的介绍

user_visit_action表

      其实就是放比如说网站或者是app每天的点击流的数据。可以理解为用户对网站/app每点击一下就会代表在这个表里面的一条数据。

表名user_visit_actionHive表

date日期代表这个用户点击行为是在哪一天发生的

user_id代表这个点击行为是哪一个用户执行的

session_id 唯一标识了某个用户的一个访问session

page_id:点击了某些商品/品类也可能是搜索了某个关键词然后进入了某个页面页面的id

action_time:这个点击行为发生的时间点

search_keyword:如果用户执行的是一个搜索行为,比如说在网站/app中搜索了某个关键词然后会跳转到商品列表页面搜索的关键词

click_category_id:可能是在网站首页,点击了某个品类(美食、电子设备、电脑)

click_product_id:可能是在网站首页或者是在商品列表页点击了某个商品比如呷哺呷哺火锅XX路店3人套餐、iphone 6s

order_category_ids:代表了可能将某些商品加入了购物车然后一次性对购物车中的商品下了一个订单这就代表了某次下单的行为中有哪些商品品类可能有6个商品但是就对应了2个品类比如有3根火腿肠食品品类3个电池日用品品类

order_product_ids:某次下单,具体对哪些商品下的订单

pay_category_ids:代表的是,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品类

pay_product_id:代表的,支付行为下,对应的哪些具体的商品

user_info表

       实际上,就是一张最普通的用户基础信息表;这张表里面,其实就是放置了网站/app所有的注册用户的信息。那么我们这里也是对用户信息表进行了一定程度的简化。比如略去了手机号等这种数据。因为我们这个项目里不需要使用到某些数据。那么我们就保留一些最重要的数据即可。

表名user_infoHive表

user_id其实就是每一个用户的唯一标识通常是自增长的Long类型BigInt类型

username是每个用户的登录名

name每个用户自己的昵称、或者是真实姓名

age用户的年龄

professional用户的职业

city用户所在的城市

task表

      其实是用来保存平台的使用者通过J2EE系统提交的基于特定筛选参数的分析任务的信息就会通过J2EE系统保存到task表中来。之所以使用MySQL表是因为J2EE系统是要实现快速的实时插入和查询的。

表名taskMySQL表

task_id表的主键

task_name任务名称

create_time创建时间

start_time开始运行的时间

finish_time结束运行的时间

task_type任务类型就是说在一套大数据平台中肯定会有各种不同类型的统计分析任务比如说用户访问session分析任务页面单跳转化率统计任务所以这个字段就标识了每个任务的类型

task_status任务状态任务对应的就是一次Spark作业的运行这里就标识了Spark作业是新建还没运行还是正在运行还是已经运行完毕

task_param最最重要用来使用JSON的格式来封装用户提交的任务对应的特殊的筛选参数.

总体任务的流程

项目整个流程

用户访问Session分析需求分析

需求分析

       在互联网企业中,需求分析,首先就是要和产品经理也就是负责设计你开发的大数据平台产品的人,去大量开会,去沟通需求的细节。

1.按条件筛选session

       搜索过某些关键词的用户访问时间在某个时间段的用户年纪在某个范围的用户职业在某个范围内的用户所在某个城市的用户发起的session。找到对应用户的session也就是我们所说的第一步按照条件筛选session。

最大的作用就是灵活。也就是说可以让使用者,对感兴趣的和关心的用户群体,进行后续各个复杂业务逻辑的统计和分析,那么拿到结果数据就是只是针对特殊用户群体的分析结果。

2.统计出符合条件的session中访问时长在1-2s4-6s等各个范围内的session占比等。

       session访问时长也就是说一个session对应的开始的action到结束的session之间的时间范围。还有就是访问步长指的是一个session执行期间内依次点击多少个页面。

可以让人从全局的角度看到,符合某些条件的用户群体,使用产品的习惯,对于使用者来说,有一个全局和清晰的认识。

3.在符合添加的session中按照时间比例随机抽取1000个session。

       这个功能的作用是可以让使用者能够对符合条件的session按照时间比例均匀的随机采取1000个session然后观察每个session具体的点击流行为比如先进入首页然后点击食品品类然后点具体商品然后下单支付。

之所以使用随机采样,就是观察样本的公平性。

4.在符合条件的session中获取点击下单和支付数量前10的品类。

       计算出所有这些session对各个品类的点击下单和支付的次数然后按照这三个属性进行排序获取前10品类。

这个功能很重要,可以让我们明白就是符合条件的用户,他最感兴趣的商品是什么总类,这个可以让公司里的人,清晰地了解不同的层次,不同的用户的心理和喜好。

5.对排名前10的品类分别获取其点击次数排名前10的session

       这个功能可以让我们了解对某个用户群体最感兴趣的品类各个品类最感兴趣最典型的用户的session的行为

用户访问session分析技术方案

这个过程涉及到技术的选型前端是JavaEE+Spark+MySQL。实现需求你的技术实现思路以及在思路中可能使用到的技术要点。

1.按条件筛选           按条件筛选session但是这个筛选的粒度是不同的比如说搜索词访问时间那么这个都是session粒度甚至是action粒度那么还有就是针对用户的基础信息进行筛选年龄性别职业所以说筛选粒度是不统一的。

每天的用户访问数据量是很大的,针对于筛选粒度不统一的问题,以及数据量巨大,可能会有两个问题,首先第一个,就是如果不统一筛选粒度,那么就必须得对所有的数据进行全量的扫描,第二个就是扫描的话,量实在是太大了。

为了解决以上问题我们这里对原始的数据进行聚合什么粒度聚合呢session粒度的聚合。也就是说用一些最基本的筛选条件比如时间范围从hive表中提取数据

然后呢按照session ID这个字段进行聚合统计那么聚合后的一天记录就是一个用户的某个session在指定时间内的访问记录比如搜过所有的关键词点击过的所有品类ID点击session对应的userID关联的用户的基础信息。

聚合过后针对session粒度的数据按照使用者指定的筛选条件进行数据的筛选。筛选出来符合条件的用户session粒度数据其实就是我们想要的那些session。

2.聚合统计

         对于某个时间段的session的数量就需要累加基本上实现这个最好的选择就是accumulator变量但是问题又来了如果使用基础的accumulator变量导致维护变得更加复杂在修改代码的时候很可能会导致错误。

我们可以使用自定义的accumulator的技术实现复杂的分布式计算。

3.在符合条件的session中按照时间比例随机抽取1000个session

         使用groupbykey等算子进行计算

4.在符合条件的session中获取点击下单和支付数量排名前10的品类

         需要对每个品类的点击下单和支付数量都进行计算。使用spark的自定义key进行二次排序技术来实现所有的品类按照三个字段点击数量下单数量支付数量依次进行排序首先比较点击数量如果相同的话那么比较下单数量如果还是相同那么比较支付数量。

5.对排名前10的品类分别获取点击次数前10的session

         这个需求需要使用spark的分组获取topN的算法进行实现。也就是对排名前10的品类对应的数据按照品类ID进行分组然后求出每组点击数量排名前10的session。

总结

总结一下,可以学习到的知识点

1.通过底层数据聚合来减少spark作业处理数据从而提升spark作业的性能从根本上提升spark性能的技巧。

2.自定义accumulator实现复杂分布式计算的技术

3.spark按时间比例随机抽取算法

4.spark自定义key二次排序技术

5.spark分组取topN算法

6.通过spark的各个功能和技术点进行各种聚合采样排序取topN业务的实现。

用户访问session数据表分析

       在进行完了数据调研、需求分析、技术实现方案进行数据设计。数据设计往往包含两个环节第一个呢就是说我们的上游数据就是数据调研环节看到的项目基于的基础数据是否要针对其开发一些Hive ETL对数据进行进一步的处理和转换从而让我们能够更加方便的和快速的去计算和执行spark作业第二个就是要设计spark作业要保存结果数据的业务表的结构从而让J2EE平台可以使用业务表中的数据来为使用者展示任务执行结果。

第一表session_aggr_stat表存储第一个功能session聚合统计的结果

CREATE TABLE `session_aggr_stat` (
  `task_id` int(11) NOT NULL,
  `session_count` int(11) DEFAULT NULL,
  `1s_3s` double DEFAULT NULL,
  `4s_6s` double DEFAULT NULL,
  `7s_9s` double DEFAULT NULL,
  `10s_30s` double DEFAULT NULL,
  `30s_60s` double DEFAULT NULL,
  `1m_3m` double DEFAULT NULL,
  `3m_10m` double DEFAULT NULL,
  `10m_30m` double DEFAULT NULL,
  `30m` double DEFAULT NULL,
  `1_3` double DEFAULT NULL,
  `4_6` double DEFAULT NULL,
  `7_9` double DEFAULT NULL,
  `10_30` double DEFAULT NULL,
  `30_60` double DEFAULT NULL,
  `60` double DEFAULT NULL,
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

第二个表session_random_extract表存储我们的按时间比例随机抽取功能抽取出来的1000个session

CREATE TABLE `session_random_extract` (
  `task_id` int(11) NOT NULL,
  `session_id` varchar(255) DEFAULT NULL,
  `start_time` varchar(50) DEFAULT NULL,
  `end_time` varchar(50) DEFAULT NULL,
  `search_keywords` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

第三个表top10_category表存储按点击、下单和支付排序出来的top10品类数据

CREATE TABLE `top10_category` (
  `task_id` int(11) NOT NULL,
  `category_id` int(11) DEFAULT NULL,
  `click_count` int(11) DEFAULT NULL,
  `order_count` int(11) DEFAULT NULL,
  `pay_count` int(11) DEFAULT NULL,
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

第四个表top10_category_session表存储top10每个品类的点击top10的session

CREATE TABLE `top10_category_session` (
  `task_id` int(11) NOT NULL,
  `category_id` int(11) DEFAULT NULL,
  `session_id` varchar(255) DEFAULT NULL,
  `click_count` int(11) DEFAULT NULL,
   PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

最后一张表session_detail用来存储随机抽取出来的session的明细数据、top10品类的session的明细数据

CREATE TABLE `session_detail` (
  `task_id` int(11) NOT NULL,
  `user_id` int(11) DEFAULT NULL,
  `session_id` varchar(255) DEFAULT NULL,
  `page_id` int(11) DEFAULT NULL,
  `action_time` varchar(255) DEFAULT NULL,
  `search_keyword` varchar(255) DEFAULT NULL,
  `click_category_id` int(11) DEFAULT NULL,
  `click_product_id` int(11) DEFAULT NULL,
  `order_category_ids` varchar(255) DEFAULT NULL,
  `order_product_ids` varchar(255) DEFAULT NULL,
  `pay_category_ids` varchar(255) DEFAULT NULL,
  `pay_product_ids` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

额外的一张表task表用来存储J2EE平台插入其中的任务的信息

CREATE TABLE `task` (
  `task_id` int(11) NOT NULL AUTO_INCREMENT,
  `task_name` varchar(255) DEFAULT NULL,
  `create_time` varchar(255) DEFAULT NULL,
  `start_time` varchar(255) DEFAULT NULL,
  `finish_time` varchar(255) DEFAULT NULL,
  `task_type` varchar(255) DEFAULT NULL,
  `task_status` varchar(255) DEFAULT NULL,
  `task_param` text,
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

完成了数据调研、需求分析、技术方案设计、数据设计以后,正式进入编码实现和功能测试阶段。最后才是性能调优阶段。

编码以及实现思路

数据筛选与聚合

       通过执行任务的参数时间(筛选范围的开始时间和结束时间)筛选出符合要求的数据。 聚合实现的思路是:根据时间筛选后的actionClick的数据映射成为Pair<SessionId,Row>的形式Row表示每一次点击行为。 然后根据SessionId进行分组对于分组后的数据根据SessionId粒度进行聚合封装有价值的数据(搜索词,点击品类等) 在SessionId粒度聚合后查询出所有的用户将用户映射成为Pair<Long,Row>对于上诉两个RDD进行Join将点击信息和用户信息封装成String的数据格式 按照SessionId为Key需求信息为Value返回。        在进行聚合后,根据执行任务的相关参数进行进一步的筛选操作,比如在根据用户性别,职业,城市, 搜索词点击的品类id进行筛选。

统计各个范围内的Session和步长占比

       实现自定义Accumulator,在自定义Accumulator中实现各个范围内的操作这里是一个统一的操作。 在上面的根据需求进行筛选的时候我们可以利用上述的过程进行代码重构计算每一个Session的访问时长在判断完符合条件后可以利用自定义的Accumulator 进行各个范围的SessionId还有就是各个范围的步长通过得到每个范围的数量后计算出来占比然后插入到数据库。

随机抽取100个session

       利用上面Session粒度的聚合代码进行代码重构计算出每一个Session的开始时间和结束时间。 将过滤后的RDD映射成为Pair形式Key为Date_Hour,Value为需求信息然后将这个Pair按照日期和时间划分也就是 Map<String,Map<String,Long>> dateHourCount,日期作为Key时间和数量作为Map计算总的Session个数然后每一天平摊100个在根据每天的数量 计算每一个小时的个数然后调用一个随机的函数获得随机索引然后再遍历过滤需求如果找到随机索引对应的信息那么将信息保存在一个List里面批量插入 之后将得到的SessionId按照join过滤后的数据然后分区进行批量插入(这也是性能优化之一)。

获取热门品类Top10

       想将按照需求过滤后的数据和按照时间过滤后的数据进行Join操作得到完整的数据也就是每一次点击的 行为还有用户的特征然后获取点击、下单和支付的品类Id注意这里需要去重然后分别计算点击、下单和支付品类的各个Id和次数将上一次得到的品类id和这三步 相LeftOuterJoin最后的得到一个RDD这个RDD进行map后放入我们自定义的二次排序类然后将数据后批量插入到数据库。

获取每一个热门品类的Top10Session

       根据上面获取Top10的品类Id然后根据以往筛选的数据计算每一个用户对于品类的点击次数然后和 Top10的数据相Join然后计算每一个品类的点击次数在根据CategoryId进行分组拿到TopN的session数据插入数据库。

用户访问Session的比较高端技术

自定义Accumulator

         使用自定义Accumulator降低维护成本一个就可以搞定很多业务需求

性能调优篇

性能调优之在实际项目中分配更多资源

性能调优的王道增加和分配更多的资源性能和速度上的调优是显而易见的基本上在一定范围内增加资源与性能的提升是成正比的写完一个spark作业以后 ,进行性能调优。 1.分配那些资源? executor CPU per executor memory per executor

2.在哪里分配这些资源? 提交shell脚本的时候

3.怎么调优,以及调优的原则? 第一种spark standalone公司集群上搭建一套spark集群技术心里应该清楚。 第二种yarn资源调度。应该去查看你的spark作业要提交到资源队列技术大概有多少资源 一个原则,你能使用的资源能有多大,就尽量调节到最大。

4.为什么多分配这些资源以后我性能会得到提升?

性能调优之Spark并行度

Spark并行度其实就是指的是spark作业中各个stage的task数量也就代表了spark作业的各个阶段的并行度。

如果不调节,并行度,导致并行度过低,会怎么样?

你的资源虽然分配足够了,但是问题是并行度没有与资源想匹配,导致你的资源分配浪费。

合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源。

1.task数量至少设置成与spark application的总CPU core数量相同

2.官方推荐task数量设置成为spark application总CPU core数量的2-3倍

实际情况与理想情况不同的有些task会运行快一点有些task可能会慢一点如果你的task设置的和CPU core数量相同可能会导致资源浪费。

3.如何设置一个 spark application的并行度 spark.default.parallelism参数在conf中设置。

性能调优之RDD重构和持久化

1.默认情况下多次对一个RDD执行算子去获取不同的RDD都会对这个RDD以及以前的父RDD全部重新计算一次。

对于这种情况是一定要避免的一旦出现一个RDD重复计算就会导致性能急剧降低。

2.RDD架构重构优化

尽量去复用RDD差不多的RDD可以抽取成为一个共同的RDD供后面的RDD计算反复使用。

公共的RDD一定要实现持久化。持久化也就是说将RDD的数据缓存到内存或者磁盘中之后无论进行多少次计算都直接取这个RDD的持久化的数据。

持久化,是可以进行序列化的。如果正常将持久化在内存,那么可能会导致内存的占用过大,这样的话,会知道内存溢出。

当内存无法支持公共RDD数量完全存放的时候就优先考虑使用序列化的方式在存内存存储。序列化的方式唯一的缺点是在获取数据的时候需要到反序列化。

如果序列化纯内存,只能内存+磁盘的序列化方式。

为了数据的高可靠性,而且内存充足可以使用双副本,进行持久化。持久化的双副本机制,因为机器宕机了,副本就丢了,需要重复机制,但是这样是针对你的资源很充分。

性能调优之在实际项目中广播大变量

默认情况下task执行的算子中使用外部的变量每个task都会获取一份变量有什么缺点在什么情况下会出现性能上的优劣影响

每一个task出现一份变量极其消耗内存有可能导致堆内存不足频繁GC以及RDD持久化部分写入到磁盘从而导致磁盘IO的消耗等。

如何解决上述性能影响呢?

广播变量。广播变量在driver上会有一份初始的副本第一个executor都有一个BlockManager负责管理某个内存和磁盘上的数据

这个会在driver上拉去相应的广播变量有可能会从远层的driver上获取变量副本也有可能从距离比较近的其他节点获取。

广播变量的好处不是每一个task一个变量副本而是每一个executor一个变量副本这样减少网络传输数据也给减少了内存存储。

性能优化之在实际项目中使用Kryo序列化

默认情况下spark内部使用java的序列化机器objectOutPutStream/objectInPutStream对象输入输出流机制

通过这种机制序列化,这种默认的序列化机制好处在于不必手动,但是缺点在于效率不高,占用内存比较大,。

Kryo序列化机制速度快乐序列化之后数据更小大概是java序列化机制的1/10在序列化之后可以让网络传输的数据变小内存资源也变小。

kyro序列化机制: 1.算子函数中使用的外部变量 2.持久化RDD进行序列化 3.shuffle过程

性能优化之Json数据格式优化

FastUtil是扩展了Java标准集合框架的类库提供了特定类型的Map,Set,List,QueueArrayList,HashMap能够提供更小的内存占用更快的存取速度

也提供了64位的Array,Set,List以及快速的和实用的IO类处理二进制和文本文件。最新版本要求Java7以及以上版本。我们使用FasyUtil提供的集合类

来替代自己平时使用的JDK的原生的MapList,Set好处在于FastUtil集合类可以减少内存的占用并且在集合遍历根据索性获取元素的值和设置元素的值

提供更快的存取速度。除了对象和原始类型fastutil也提供引用类型的支持但是使用等于号进行比较的而不是equals方法。

Spark应用fastutil

1.如果算子函数使用了外部变量那么第一你可以使用Broadcast广播变量优化第二可以使用kyro序列化类库提升序列化性能和效率

第三如果外部变量是某种比较大的集合那么可以考虑使用fastutil改写外部变量首先从源头上减少内存的占比通过广播变量进一步减少内存占用

通过kyro序列化类库进一步减少内存占比。

性能调优之调节数据本地化等待时间

spark的task分配算法优先会希望每个task正好分配到它要计算的数据所在的节点这样就不用在网络间传输数据。但是一般是事与愿违通常spark还会等待一段时间默认情况下是3秒如果不行就会选择这个比较差的本地化级别比如说将task分配到靠它要计算的数据所在的节点在比较近的一个节点然后进行计算。 如果发生数据传输task会通过其所在节点的BlockManager来获取数据通过一个getRemote方法通过网络传输组件从数据所在的节点的BlockManager中获取数据通过网络传输回task所在的节点进行计算。

最佳情况task和BlockManager直接在一个executor进程内走内存速度最佳;同一机架,不在一个节点,需要网络传输;在一个节点多个executor之间数据传输;不同机架,跨机架之间的网络传输,这种情况对性能的影响非常大。

PROCESS_LOCAL:进程本地化代码和数据在同一进程也就是同一个executor计算的task由executor执行BlockManager中有数据性能最好。

NODE_LOCAL:节点本地化,代码和数据在同一节点,但是数据在不同进程。

RACK_LOCAL:在一个机架上,需要跨节点拉去数据。

ANY:跨机架拉去数据。

spark.locality.wait默认是3s。

在什么时候调节数据本地化参数呢?

观察spark作业的运行日志推荐首先使用本地模式在日志中会显示数据本地化级别大多数是PROCESS_LOCAL调节。 如果是其他最好是调节一下数据本地化的等地时长。需要反复调节每次调节完再运行观察日志。看看大部分task的本地化级别有没有提提升看看整个spark作业运行时间有没有缩短。

怎么调节?

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

在SparkConf中设置即可

JVM调优原理之降低cache操作的内存比

有哪些调优?

1.常规性能调优,分配资源,并行度

2.JVM调优JVM相关的参数通常情况下如果你的硬件配置基础的JVM的配置通常都不会造成太严重的性能问题。主要是在线上故障中JVM占很重要的地位。

3.shuffle调优spark在执行groupbykey,reducebykey等操作时shuffle环节很重要shuffle调优其实对spark作业的性能的影响相当高基本上shuffle的性能消耗占用整个spark的50及以上。

4.spark操作的调优gourpbykey,countbykey来重构有些算子性能是比其他算子的性能要高

Spark中堆内存又被划分成为两部分一块是专门用来给RDD的cache,persist进行数据缓存用的还有一块是用来算子运算的存放函数中自己创建的对象。

默认情况下给算子cache操作的内存占比是0.6也就是用于算子做算的只占有0.4。如果出现频道的GC如果cache操作很充足那么就可以调节一下占比降低

cache操作的内存占比大不了用persist操作选择将缓存的数据写入磁盘配合序列化方式减少算子缓存内存占比。

一句话让task执行算子函数有更多的内存可以是使用。可以使用参数spark.storage.memoryFraction进行调节默认是0.6。

JVM调优之调节executor堆外内存之连接等待时长

有时候如果你的spark作业处理的数据量特别大的几亿数据量然后作业一运行时不时的保存shuffle file cannot findtask lost oom。

可以说你的executor的堆外内存不足够导致executor在运行的过程中可能会内存溢出然后导致后续的stage的task在运行的时候可能要从一些

executor中拉去shuffle map output文件但是executor可能会挂掉关联的block manager也没有了可能会报shuffle output not found

等spark作业会彻底崩溃。

上述情况可以考虑调节executor的堆外内存占比此外有时堆外内存调节的比较的时候对于性能的提升会有一定的提升。

如何调节?

在spark-submit的脚本中去用--conf的方式去添加配置。

spark.yarn.executor.memoryOverhead参数针对的的是yarn的提交方式。

通过这个参数调节以后,可以避免以上问题的避免。

对于等待时间就是在出现上述错误的时候连接不上拉去数据的Block Manager就会出现这个问题我们需要在spark-submit脚本中配置等待时长默认是60秒。

Shuffle调优之shuffle原理

什么样的情况下会发生shuffle?

在spark中主要是以下几个算子groupByKey、reduceByKey、countByKey、join等等。

什么是shuffle?

groupByKey要把分布在集群各个节点上的数据中的同一个key对应的values都给集中到一块儿集中到集群中同一个节点上更严密一点说就是集中到一个节点的一个executor的一个task中。

然后呢集中一个key对应的values之后才能交给我们来进行处理<key, Iterable>reduceByKey算子函数去对values集合进行reduce操作最后变成一个valuecountByKey需要在一个task中获取到一个key对应的所有的value然后进行计数统计总共有多少个valuejoinRDD<key, value> RDD<key, value>只要是两个RDD中key相同对应的2个value都能到一个节点的executor的task中给我们进行处理。

shuffle示意图 shuffle示意图

1.每一个shuffle的前半部分stage的task每个task都会创建下一个stage的task数量相同的文件比如下一个stage会有100个task那么当前stage每个task都会创建100份文件会将同一个key对应的values一定是写入同一个文件中的不同节点上的task也一定会将同一个key对应的values写入下一个stage同一个task对应的文件中。

shuffle的后半部分stage的task每个task都会从各个节点上的task写的属于自己的那一份文件中拉取key, value对然后task会有一个内存缓冲区然后会用HashMap进行key, values的汇聚(key ,values)

task会用我们自己定义的聚合函数比如reduceByKey(+)把所有values进行一对一的累加聚合出来最终的值。就完成了shuffle

2.shuffle一定是分为两个stage来完成的。因为这其实是个逆向的过程不是stage决定shuffle是shuffle决定stage。

reduceByKey(+)在某个action触发job的时候DAGScheduler会负责划分job为多个stage。划分的依据就是如果发现有会触发shuffle操作的算子比如reduceByKey就将这个操作的前半部分以及之前所有的RDD和transformation操作划分为一个stageshuffle操作的后半部分以及后面的直到action为止的RDD和transformation操作划分为另外一个stage。

3.shuffle前半部分的task在写入数据到磁盘文件之前都会先写入一个一个的内存缓冲内存缓冲满溢之后再spill溢写到磁盘文件中。

Shuffle调优之合并map端输出文件

如果不合并map端输出文件会怎么样?

前置条件每个executor有2个cpu core。4个task。task是线程执行的。所以先并行跑2个task再跑剩下2个task。 合并map端输出文件 问题来了默认的这种shuffle行为对性能有什么样的恶劣影响呢 实际生产环境的条件: 100个节点每个节点一个executor100个executor 每个executor2个cpu core 总共1000个task每个executor平均10个task

每个节点10个task每个节点会输出多少份map端文件10 * 1000=1万个文件

总共有多少份map端输出文件100 * 10000 = 100万。

shuffle中的写磁盘的操作基本上就是shuffle中性能消耗最为严重的部分。

通过上面的分析一个普通的生产环境的spark job的一个shuffle环节会写入磁盘100万个文件。

磁盘IO对性能和spark作业执行速度的影响是极其惊人和吓人的。

基本上spark作业的性能都消耗在shuffle中了虽然不只是shuffle的map端输出文件这一个部分但是这里也是非常大的一个性能消耗点。

开启map端输出文件合并

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

开启shuffle map端输出文件合并的机制默认情况下是不开启的就是会发生如上所述的大量map端输出文件的操作严重影响性能。 map端文件合并 开启了map端输出文件的合并机制之后

第一个stage同时就运行cpu core个task比如cpu core是2个并行运行2个task每个task都创建下一个stage的task数量个文件

第一个stage并行运行的2个task执行完以后就会执行另外两个task另外2个task不会再重新创建输出文件而是复用之前的task创建的map端输出文件将数据写入上一批task的输出文件中。

第二个stagetask在拉取数据的时候就不会去拉取上一个stage每一个task为自己创建的那份输出文件了而是拉取少量的输出文件每个输出文件中可能包含了多个task给自己的map端输出。 提醒一下map端输出文件合并

只有并行执行的task会去创建新的输出文件下一批并行执行的task就会去复用之前已有的输出文件但是有一个例外比如2个task并行在执行但是此时又启动要执行2个task那么这个时候的话就无法去复用刚才的2个task创建的输出文件了而是还是只能去创建新的输出文件。

要实现输出文件的合并的效果必须是一批task先执行然后下一批task再执行才能复用之前的输出文件负责多批task同时起来执行还是做不到复用的。 开启了map端输出文件合并机制之后生产环境上的例子会有什么样的变化

实际生产环境的条件: 100个节点每个节点一个executor100个executor 每个executor2个cpu core 总共1000个task每个executor平均10个task

每个节点2个cpu core有多少份输出文件呢2 * 1000 = 2000个 总共100个节点总共创建多少份输出文件呢100 * 2000 = 20万个文件

相比较开启合并机制之前的情况100万个

map端输出文件在生产环境中立减5倍 合并map端输出文件对咱们的spark的性能有哪些方面的影响呢

1、map task写入磁盘文件的IO减少100万文件 -> 20万文件

2、第二个stage原本要拉取第一个stage的task数量份文件1000个task第二个stage的每个task都要拉取1000份文件走网络传输

合并以后100个节点每个节点2个cpu core第二个stage的每个task主要拉取100 * 2 = 200个文件即可网络传输的性能消耗是不是也大大减少

分享一下实际在生产环境中使用了spark.shuffle.consolidateFiles机制以后实际的性能调优的效果对于上述的这种生产环境的配置

性能的提升还是相当的客观的。spark作业5个小时 -> 2~3个小时。

大家不要小看这个map端输出文件合并机制。实际上在数据量比较大你自己本身做了前面的性能调优executor上去->cpu core上去->并行度task数量上去 shuffle没调优shuffle就很糟糕了大量的map端输出文件的产生。对性能有比较恶劣的影响。

这个时候,去开启这个机制,可以很有效的提升性能。

Shuffle调优之调节map端内存缓冲与reduce端占比

未调优之前

spark.shuffle.file.buffer默认32k

spark.shuffle.memoryFraction0.2

map端内存缓冲reduce端内存占比很多资料、网上视频都会说这两个参数是调节shuffle性能的不二选择很有效果的样子实际上不是这样的。

以实际的生产经验来说这两个参数没有那么重要往往来说shuffle的性能不是因为这方面的原因导致的

但是有一点点效果的broadcast数据本地化等待时长这两个shuffle调优的小点其实也是需要跟其他的大量的小点配合起来使用一点一点的提升性能

最终很多个性能调优的小点的效果,汇集在一起之后,那么就会有可以看见的还算不错的性能调优的效果 map端内存缓冲与reduce端 原理描述

reduce端task在拉取到数据之后会用hashmap的数据格式来对各个key对应的values进行汇聚。

针对每个key对应的values执行我们自定义的聚合函数的代码比如_ + _把所有values累加起来

reduce task在进行汇聚、聚合等操作的时候实际上使用的就是自己对应的executor的内存executorjvm进程默认executor内存中划分给reduce task进行聚合的比例是0.2。

问题来了因为比例是0.2所以理论上很有可能会出现拉取过来的数据很多那么在内存中放不下这个时候默认的行为就是说将在内存放不下的数据都spill溢写到磁盘文件中去。

原理说完之后,来看一下,默认情况下,不调优,可能会出现什么样的问题?

默认map端内存缓冲是每个task32kb。

默认reduce端聚合内存比例是0.2也就是20%。

如果map端的task处理的数据量比较大但是呢你的内存缓冲大小是固定的。可能会出现什么样的情况

每个task就处理320kb32kb总共会向磁盘溢写320 / 32 = 10次。 每个task处理32000kb32kb总共会向磁盘溢写32000 / 32 = 1000次。

在map task处理的数据量比较大的情况下而你的task的内存缓冲默认是比较小的32kb。可能会造成多次的map端往磁盘文件的spill溢写操作发生大量的磁盘IO从而降低性能。

reduce端聚合内存占比。默认是0.2。如果数据量比较大reduce task拉取过来的数据很多那么就会频繁发生reduce端聚合内存不够用频繁发生spill操作

溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。

默认不调优在数据量比较大的情况下可能频繁地发生reduce端的磁盘文件的读写。

这两个点之所以放在一起讲是因为他们俩是有关联的。数据量变大map端肯定会出点问题reduce端肯定也会出点问题出的问题是一样的都是磁盘IO频繁变多影响性能。

优化思路

调节map task内存缓冲spark.shuffle.file.buffer默认32kspark 1.3.x不是这个参数后面还有一个后缀kbspark 1.5.x以后

变了就是现在这个参数调节reduce端聚合内存占比spark.shuffle.memoryFraction0.2

在实际生产环境中,我们在什么时候来调节两个参数?

看Spark UI如果你的公司是决定采用standalone模式那么狠简单你的spark跑起来会显示一个Spark UI的地址8080的端口进去看依次点击进去

可以看到你的每个stage的详情有哪些executor有哪些task每个task的shuffle write和shuffle read的量shuffle的磁盘和内存读写的数据量

如果是用的yarn模式来提交课程最前面从yarn的界面进去点击对应的application进入Spark UI查看详情。

如果发现shuffle 磁盘的write和read很大。这个时候就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer每次扩大一倍然后看看效果64128spark.shuffle.memoryFraction

每次提高0.1,看看效果。

不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。

调节了以后效果map task内存缓冲变大了减少spill到磁盘文件的次数reduce端聚合内存变大了减少spill到磁盘的次数而且减少了后面聚合

读取磁盘文件的数量。

troubleshooting

troubleshooting之控制shuffle reduce端缓冲大小避免OOM

shuffle过程中优map端的task是不断的输出数据的数据量可能是很大的但是其实reduce端的task并不是等到map端task将属于自己的那份数据全部

写入磁盘文件之后再拉去的map端写一点数据reduce端task就会拉去一部分数据立即进行后面的聚合算子函数的应用。

每次reduce能够拉去多少数据就是由buffer来决定的因为拉去过来的数据都是先放在buffer中的然后才用后面的executor分配的堆内存占比

hashmap去进行后续的聚合函数执行。

reduce端缓冲可能出现什么问题

默认48MBreduce端task一边拉取一边计算不一定一直会拉满48MB的数据可能大多数情况下拉取10MB就计算掉了。

大多数时候也不会出现问题有些时候map端的数据量特别大然后写出的数据特别快reduce端所有的task拉去的时候全部到达自己极限值。

这个时候加上你的reduce端执行的聚合函数的代码就可能创建大量的对象一下子内存就出现OOM。reduce端的内存中就出现了内存溢出。

怎么解决呢?

减少reduce端task缓冲大小这样就不容易出现OOM问题了。

spark作业首先第一要义就是一定要让它跑起来然后再考虑性能。

关于reduce端缓冲大小的另外一面关于性能调优:

如果资源特别充分可以尝试增加reduce端缓冲大小这样就可以减少拉取次数减少网络传输。

配置的参数spark.reducer.maxSizeInflight

troubleshooting之shuffle文件拉取失败

有时候会出现一种情况,非常普遍;shuffle file cannot find在spark的作业中这是非常普遍而且有时候他会偶尔出现但是重现提交task后

这种现象又不会出现可以考虑是某一个executor在执行GC但是下一个stage的executor需要拉去该task中的数据这就导致了还现象的发生。

spark.shuffle.io.maxRetries 3 这个参数表示shuffle文件拉取的时候如果没有拉取到最多或者重试几次默认是3次。

spark.shuffle.io.retryWait 5s 这个参数的意思是每一次拉取文件的时间间隔默认是5s。

针对以上情况我们可以可以增大这两个参数的值达到比较大的一个值尽量保证第二个stage的task一定能够拉取到上一个stage的输出文件。避免出现上述错误。

troubleshooting之解决各种序列化导致的报错

你会看到什么样的序列化导致的报错?

用client模式提交spark作业观察本地打出的log如果出现Serializable,Serialize等字段报错的log那就出现了序列化问题导致的错误。

序列化报错注意的三个点:

1.算子函数中设置如果使用到自定义的类型,一定要序列化

2.如果将自定义的类型,作为算子的元素类型,那么自定义的类型必须是可以序列化的

3.不能在上述两种情况下,去使用一些第三方的不支持序列化的类型

troubleshooting之解决算子函数返回NULL的问题

如果碰到对于某些值,不想要有返回值的话,有一个解决的办法:

1.在返回的时候返回或者特殊的值得不要返回null。

2.通过算子获取一个RDD后可以通过一些过滤操作进行数据过滤。

3.可以使用压缩算子提前性能。

数据倾斜

数据倾斜解决方案之原理以及现象分析

1.数据倾斜的原理

spark进行shuffle时由于数据分配不均匀导致某个Task的数据过大这个Task运行时间过长这就是数据倾斜。

2.数据倾斜的现象

spark数据倾斜有两种表现:

1.大部分的task都执行的特别特别快有几个task执行的特别特别慢前面的task一般1s可以执行完5个后面发现1000个task999task要执行1小时或者2小时才能执行完一个task

2.运行的时候其他task执行造成没有什么问题但是有的task就突然出现OOMtask failedtask lost反复执行几次都是某个task跑不通最后挂掉。

3.数据倾斜的产生原因与定位

根据log去定位出现出现数据倾斜的原因基本只可能因为出现了shuffle操作的在shuffle的过程中出现了数据倾斜的问题。因为某个或者某些key对应的数据远高于其他key。

1.在程序中找到产生shuffle得算子

2.看loglog一般会报你的哪一行代码导致OOM异常看看执行到第几个stage。

数据倾斜解决方案之聚合源数据

解决数据倾斜的方案:

1.聚合源数据

2.过滤导致倾斜的Key

聚合源数据的思路:

spark作业的数据来源通常是从hive表。对于spark作业的输入源头可以将同一key的数据进行拼接对于这种操作之后可能就没有shuffle操作了何来数据倾斜

如果没法对每一个key进行聚合出来一条数据。可以对于数据进行粗粒度的聚合比如根据时间或者地域进行聚合尽量去聚合减少每个key的数量也许聚合到比较粗的粒度后原来的数据减少了减轻了数据倾斜的现象和问题。

数据倾斜解决方案之提高shuffle操作reduce端的并行度

将reduce task的数量变多就可以让每个reduce task分配到更少的数据量这样的话也许就可以缓解或者甚至是解决数据倾斜的问题

提升shuffle reduce端并行度怎么操作

在调用的时候传入进入一个参数这个数字表示reduce端的并行度。

这个方案只是缓解了数据倾斜的问题。