diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 1f53974..d0ee455 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,12 +2,7 @@ - - - - - @@ -24,13 +19,18 @@ - - + + - - - + + + + + + + + @@ -39,8 +39,8 @@ - - + + @@ -50,8 +50,8 @@ - - + + @@ -69,6 +69,13 @@ + + + sessionAggrStatAccumulator + commonFullClickInfoRDD + SessionDe + + @@ -122,12 +129,12 @@ @@ -549,7 +556,7 @@ - + 1529592741848 @@ -601,7 +608,7 @@ - @@ -609,6 +616,7 @@ + @@ -632,7 +640,6 @@ - @@ -705,18 +712,11 @@ - - - - - - - - @@ -1023,14 +1023,6 @@ - - - - - - - - @@ -1060,11 +1052,27 @@ + + + + + + + + + + + + + + + + - - + + @@ -1073,13 +1081,18 @@ - - + + - - - + + + + + + + + diff --git a/README.md b/README.md index 26e5368..ae843a2 100644 --- a/README.md +++ b/README.md @@ -348,8 +348,85 @@ Map> dateHourCount,日期作为Key,时间和数量作        想将按照需求过滤后的数据和按照时间过滤后的数据进行Join操作,得到完整的数据,也就是每一次点击的 行为还有用户的特征,然后获取点击、下单和支付的品类Id,注意这里需要去重,然后分别计算点击、下单和支付品类的各个Id和次数,将上一次得到的品类id和这三步 相LeftOuterJoin最后的得到一个RDD,这个RDD进行map后放入我们自定义的二次排序类,然后将数据后批量插入到数据库。 -### 获取 +### 获取每一个热门品类的Top10Session +       根据上面获取Top10的品类Id,然后根据以往筛选的数据,计算每一个用户对于品类的点击次数,然后和 +Top10的数据相Join然后计算每一个品类的点击次数,在根据CategoryId进行分组,拿到TopN的session数据插入数据库。 ## 用户访问Session的比较高端技术 ### 自定义Accumulator -         使用自定义Accumulator降低维护成本,一个就可以搞定很多业务需求 \ No newline at end of file +         使用自定义Accumulator降低维护成本,一个就可以搞定很多业务需求 + +## 性能调优篇 +### 性能调优之在实际项目中分配更多资源 +性能调优的王道,增加和分配更多的资源,性能和速度上的调优,是显而易见的,基本上在一定范围内,增加资源与性能的提升,是成正比的,写完一个spark作业以后 +,进行性能调优。 +1.分配那些资源? + executor, CPU per executor, memory per executor + +2.在哪里分配这些资源? 提交shell脚本的时候 + +3.怎么调优,以及调优的原则? + 第一种spark standalone,公司集群上,搭建一套spark集群技术心里应该清楚。 + 第二种,yarn,资源调度。应该去查看,你的spark作业,要提交到资源队列技术大概有多少资源? +一个原则,你能使用的资源能有多大,就尽量调节到最大。 + +4.为什么多分配这些资源以后我性能会得到提升? +### 性能调优之Spark并行度 +Spark并行度,其实就是指的是spark作业中,各个stage的task数量,也就代表了spark作业的各个阶段的并行度。 + +如果不调节,并行度,导致并行度过低,会怎么样? + + 你的资源虽然分配足够了,但是问题是并行度没有与资源想匹配,导致你的资源分配浪费。 + +合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源。 + +1.task数量,至少设置成与spark application的总CPU core数量相同 + + +2.官方推荐,task数量,设置成为spark application总CPU core数量的2-3倍, + +实际情况,与理想情况不同的,有些task会运行快一点,有些task可能会慢一点,如果你的task设置的和CPU core数量相同,可能会导致资源浪费。 + +3.如何设置一个 spark application的并行度? spark.default.parallelism参数,在conf中设置。 +### 性能调优之RDD重构和持久化 +1.默认情况下,多次对一个RDD执行算子,去获取不同的RDD,都会对这个RDD以及以前的父RDD,全部重新计算一次。 + +对于这种情况,是一定要避免的,一旦出现一个RDD重复计算,就会导致性能急剧降低。 + +2.RDD架构重构优化 + + 尽量去复用RDD,差不多的RDD,可以抽取成为一个共同的RDD供后面的RDD计算,反复使用。 + +公共的RDD一定要实现持久化。持久化也就是说,将RDD的数据缓存到内存或者磁盘中,之后无论进行多少次计算,都直接取这个RDD的持久化的数据。 + + 持久化,是可以进行序列化的。如果正常将持久化在内存,那么可能会导致内存的占用过大,这样的话,会知道内存溢出。 + + 当内存无法支持公共RDD数量完全存放的时候,就优先考虑,使用序列化的方式在存内存存储。序列化的方式,唯一的缺点是在获取数据的时候,需要到反序列化。 + + 如果序列化纯内存,只能内存+磁盘的序列化方式。 + + 为了数据的高可靠性,而且内存充足可以使用双副本,进行持久化。持久化的双副本机制,因为机器宕机了,副本就丢了,需要重复机制,但是这样是针对你的资源很充分。 +### 性能调优之在实际项目中广播大变量 + + 默认情况下,task执行的算子中,使用外部的变量,每个task都会获取一份变量,有什么缺点?在什么情况下会出现性能上的优劣影响? + + 每一个task出现一份变量,极其消耗内存,有可能导致堆内存不足,频繁GC,以及RDD持久化部分写入到磁盘,从而导致磁盘IO的消耗等。 + +如何解决上述性能影响呢? + + 广播变量。广播变量在driver上会有一份初始的副本,第一个executor都有一个BlockManager负责管理某个内存和磁盘上的数据, + +这个会在driver上拉去相应的广播变量,有可能会从远层的driver上获取变量副本,也有可能从距离比较近的其他节点获取。 + +广播变量的好处,不是每一个task一个变量副本,而是每一个executor一个变量副本,这样减少网络传输数据,也给减少了内存存储。 +### 性能优化之在实际项目中使用Kryo序列化 +默认情况下,spark内部使用java的序列化机器,objectOutPutStream/objectInPutStream对象输入输出流机制, + +通过这种机制序列化,这种默认的序列化机制好处在于不必手动,但是缺点在于效率不高,占用内存比较大,。 + +Kryo序列化机制,速度快乐序列化之后数据更小,大概是java序列化机制的1/10,在序列化之后,可以让网络传输的数据变小,内存资源也变小。 + +kyro序列化机制: + 1.算子函数中使用的外部变量 + 2.持久化RDD进行序列化 + 3.shuffle过程 \ No newline at end of file diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java index d69a414..ae867a7 100644 --- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java +++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java @@ -19,6 +19,7 @@ import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; +import org.apache.spark.storage.StorageLevel; import scala.Tuple2; import java.util.*; @@ -55,10 +56,14 @@ public class UserVisitAnalyze { //获取指定范围内的Sesssion JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject); + //这里增加一个新的方法,主要是映射 JavaPairRDD sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate); + //重复用到的RDD进行持久化 + sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY()); + //上面的两个RDD是 //按照Sesson进行聚合 - JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionRangeDate); + JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD); //通过条件对RDD进行筛选 // 重构,同时统计 @@ -67,9 +72,13 @@ public class UserVisitAnalyze { //在进行accumulator之前,需要aciton动作,不然会为空 JavaPairRDD filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator); - + //重复用到的RDD进行持久化 + filteredSessionRDD.persist(StorageLevel.DISK_ONLY()); //获取符合过滤条件的全信息公共RDD JavaPairRDD commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD); + + //重复用到的RDD进行持久化 + commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY()); //session聚合统计,统计出访问时长和访问步长的各个区间所占的比例 /** * 重构实现的思路: @@ -102,7 +111,7 @@ public class UserVisitAnalyze { //获取热门品类数据Top10 List> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD); //获取热门每一个品类点击Top10session - getTop10Session(context,taskId,commonFullClickInfoRDD,top10CategoryIds); + getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds); //关闭spark上下文 context.close(); } @@ -167,23 +176,26 @@ public class UserVisitAnalyze { /** * session粒度的聚合 * @param sc - * @param sessionRangeDate + * @param sessionInfoPairRDD * @return */ - private static JavaPairRDD aggregateBySessionId(SQLContext sc, JavaRDD sessionRangeDate) { + private static JavaPairRDD aggregateBySessionId(SQLContext sc, JavaPairRDD sessionInfoPairRDD) { /** * 先将数据映射成map格式 */ + /** + * + 代码重构 JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() { @Override public Tuple2 call(Row row) throws Exception { return new Tuple2(row.getString(2),row); } - }); + });*/ /** * 根据sessionId进行分组 */ - JavaPairRDD> sessionActionGrouped=sessionActionPair.groupByKey(); + JavaPairRDD> sessionActionGrouped=sessionInfoPairRDD.groupByKey(); JavaPairRDD sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction>, Long, String>() { @Override @@ -875,7 +887,7 @@ public class UserVisitAnalyze { //获取每一个品类的Session Top10 - private static void getTop10Session(JavaSparkContext sc, Long taskId, JavaPairRDD commonFullClickInfoRDD, List> top10CategoryIds) { + private static void getTop10Session(JavaSparkContext sc, final Long taskId, JavaPairRDD sessionInfoPairRDD, List> top10CategoryIds) { List> categoryIdList=new ArrayList>(); for(Tuple2 top10CategoryId:top10CategoryIds) { @@ -886,7 +898,7 @@ public class UserVisitAnalyze { //生成一份RDD JavaPairRDD top10CategoryIdsRDD=sc.parallelizePairs(categoryIdList); //按照SessionId进行分组 - JavaPairRDD> gourpBySessionIdRDD=commonFullClickInfoRDD.groupByKey(); + JavaPairRDD> gourpBySessionIdRDD=sessionInfoPairRDD.groupByKey(); //计算每一个session对品类的点击次数 JavaPairRDD categorySessionCountRDD=gourpBySessionIdRDD.flatMapToPair(new PairFlatMapFunction>, Long, String>() { @Override @@ -929,12 +941,85 @@ public class UserVisitAnalyze { //根据品类分组 JavaPairRDD> top10CategorySessionCountGroupRDD=top10CategorySessionCountRDD.groupByKey(); - JavaPairRDD top10CategorySessionRDD=top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction>, Long,String>() { + JavaPairRDD top10CategorySessionRDD=top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction>, String,String>() { @Override - public Iterable> call(Tuple2> tuple2) throws Exception { + public Iterable> call(Tuple2> tuple2) throws Exception { List top10CategorySessionList=new ArrayList(); + Long categoryId=tuple2._1; + String[] top10Sessions=new String[10]; + List> sessionIdList=new ArrayList>(); + for (String sessionCount:tuple2._2) + { + String[] sessionCountSplited=sessionCount.split(","); + //String sessionId=sessionCountSplited[0]; + Long count=Long.valueOf(sessionCountSplited[1]); + //获取TopN + for(int i=0;i_count) + { + for (int j = 9; j>i ; j--) { + top10Sessions[j]=top10Sessions[j-1]; + } + top10Sessions[i]=sessionCount; + break; + } + } + } + } + //封装数据 + for (int i=0;i(sessionId,sessionId)); + } + } + //批量插入数据库 + DaoFactory.getTop10CategorySessionDao().batchInsert(top10CategorySessionList); + return sessionIdList; + } + }); - return null; + + //3. 获取session的明细数据保存到数据库 + JavaPairRDD> sessionDetailRDD= top10CategorySessionRDD.join(sessionInfoPairRDD); + sessionDetailRDD.foreachPartition(new VoidFunction>>>() { + @Override + public void call(Iterator>> tuple2Iterator) throws Exception { + List sessionDetailList=new ArrayList(); + while(tuple2Iterator.hasNext()) + { + Tuple2> tuple2=tuple2Iterator.next(); + Row row=tuple2._2._2; + String sessionId=tuple2._1; + Long userId=row.getLong(1); + Long pageId=row.getLong(3); + String actionTime=row.getString(4); + String searchKeyWard=row.getString(5); + Long clickCategoryId=row.getLong(6); + Long clickProducetId=row.getLong(7); + String orderCategoryId=row.getString(8); + String orderProducetId=row.getString(9); + String payCategoryId=row.getString(10); + String payProducetId=row.getString(11); + SessionDetail sessionDetail=new SessionDetail(); + sessionDetail.set(taskId,userId,sessionId,pageId,actionTime,searchKeyWard,clickCategoryId,clickProducetId,orderCategoryId,orderProducetId,payCategoryId,payProducetId); + sessionDetailList.add(sessionDetail); + } + DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList); } }); }