性能调优

main
oeljeklaus-you 7 years ago
parent 9e9e1e410d
commit 0480f96753

@ -2,12 +2,7 @@
<project version="4">
<component name="ChangeListManager">
<list default="true" id="a7505764-040b-48e2-b2fc-8c5b579e595f" name="Default" comment="">
<change beforePath="" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/Top10CategorySessionDao.java" />
<change beforePath="" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/impl/Top10CategorySessionDaoImpl.java" />
<change beforePath="" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/domain/Top10CategorySession.java" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" afterPath="$PROJECT_DIR$/.idea/workspace.xml" />
<change beforePath="$PROJECT_DIR$/README.md" afterPath="$PROJECT_DIR$/README.md" />
<change beforePath="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java" />
<change beforePath="$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java" />
</list>
<ignored path="$PROJECT_DIR$/out/" />
@ -24,13 +19,18 @@
<file leaf-file-name="UserVisitAnalyze.java" pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="535">
<caret line="935" column="0" lean-forward="true" selection-start-line="935" selection-start-column="0" selection-end-line="935" selection-end-column="0" />
<state relative-caret-position="-320">
<caret line="972" column="29" lean-forward="true" selection-start-line="972" selection-start-column="29" selection-end-line="972" selection-end-column="29" />
<folding>
<element signature="imports" expanded="true" />
<element signature="e#45463#45830#0" expanded="true" />
<element signature="e#45751#45773#0" expanded="true" />
<element signature="e#45829#45830#0" expanded="true" />
<element signature="e#43845#45127#0" expanded="true" />
<element signature="e#44185#44197#0" expanded="true" />
<element signature="e#44739#44760#0" expanded="true" />
<element signature="e#45126#45127#0" expanded="true" />
<element signature="e#45846#48382#0" expanded="true" />
<element signature="e#46138#46160#0" expanded="true" />
<element signature="e#46333#46357#0" expanded="true" />
<element signature="e#48381#48382#0" expanded="true" />
</folding>
</state>
</provider>
@ -39,8 +39,8 @@
<file leaf-file-name="MockData.java" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/mockData/MockData.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="1615">
<caret line="100" column="18" lean-forward="false" selection-start-line="100" selection-start-column="18" selection-end-line="100" selection-end-column="18" />
<state relative-caret-position="281">
<caret line="93" column="50" lean-forward="true" selection-start-line="93" selection-start-column="50" selection-end-line="93" selection-end-column="50" />
<folding />
</state>
</provider>
@ -50,8 +50,8 @@
<entry file="file://$PROJECT_DIR$/README.md">
<provider selected="true" editor-type-id="split-provider[text-editor;markdown-preview-editor]">
<state split_layout="FIRST">
<first_editor relative-caret-position="512">
<caret line="347" column="23" lean-forward="false" selection-start-line="347" selection-start-column="23" selection-end-line="347" selection-end-column="23" />
<first_editor relative-caret-position="345">
<caret line="352" column="68" lean-forward="false" selection-start-line="352" selection-start-column="68" selection-end-line="352" selection-end-column="68" />
<folding />
</first_editor>
<second_editor />
@ -69,6 +69,13 @@
</list>
</option>
</component>
<component name="FindInProjectRecents">
<findStrings>
<find>sessionAggrStatAccumulator</find>
<find>commonFullClickInfoRDD</find>
<find>SessionDe</find>
</findStrings>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
@ -122,12 +129,12 @@
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/impl/Top10CategoryDaoImpl.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/SessionAggrStatDao.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/impl/SessionAggrStatDaoImpl.java" />
<option value="$PROJECT_DIR$/README.md" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/domain/Top10CategorySession.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/Top10CategorySessionDao.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/impl/Top10CategorySessionDaoImpl.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java" />
<option value="$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java" />
<option value="$PROJECT_DIR$/README.md" />
</list>
</option>
</component>
@ -549,7 +556,7 @@
<workItem from="1529717692529" duration="30310000" />
<workItem from="1529837434323" duration="208000" />
<workItem from="1529845795654" duration="7752000" />
<workItem from="1529887211212" duration="424000" />
<workItem from="1529887211212" duration="6828000" />
</task>
<task id="LOCAL-00001" summary="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成">
<created>1529592741848</created>
@ -601,7 +608,7 @@
</history-entry>
</component>
<component name="TimeTrackingManager">
<option name="totallyTimeSpent" value="70541000" />
<option name="totallyTimeSpent" value="76945000" />
</component>
<component name="ToolWindowManager">
<frame x="0" y="0" width="1440" height="900" extended-state="0" />
@ -609,6 +616,7 @@
<layout>
<window_info id="Palette" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="TODO" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="6" side_tool="false" content_ui="tabs" />
<window_info id="Messages" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32889965" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
<window_info id="Palette&#9;" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="Image Layers" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
<window_info id="Java Enterprise" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
@ -632,7 +640,6 @@
<window_info id="Message" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
<window_info id="Commander" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.4" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
<window_info id="Hierarchy" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.25" sideWeight="0.5" order="2" side_tool="false" content_ui="combo" />
<window_info id="Messages" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.32889965" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.4" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" show_stripe_button="true" weight="0.33" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
</layout>
@ -705,18 +712,11 @@
</component>
<component name="XDebuggerManager">
<breakpoint-manager>
<option name="time" value="30" />
<option name="time" value="32" />
</breakpoint-manager>
<watches-manager />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/src/test/java/cn/edu/hust/json/FastJsonTest.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="133">
<caret line="7" column="13" lean-forward="false" selection-start-line="7" selection-start-column="13" selection-end-line="7" selection-end-column="13" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/conf/ConfigurationManager.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="912">
@ -1023,14 +1023,6 @@
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/mockData/MockData.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="1615">
<caret line="100" column="18" lean-forward="false" selection-start-line="100" selection-start-column="18" selection-end-line="100" selection-end-column="18" />
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/dao/Top10CategorySessionDao.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="133">
@ -1060,11 +1052,27 @@
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/mockData/MockData.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="281">
<caret line="93" column="50" lean-forward="true" selection-start-line="93" selection-start-column="50" selection-end-line="93" selection-end-column="50" />
<folding />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/spark/spark-core_2.10/1.5.1/spark-core_2.10-1.5.1.jar!/org/apache/spark/api/java/JavaRDDLike.class">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="151">
<caret line="145" column="0" lean-forward="false" selection-start-line="145" selection-start-column="0" selection-end-line="145" selection-end-column="0" />
<folding />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/README.md">
<provider selected="true" editor-type-id="split-provider[text-editor;markdown-preview-editor]">
<state split_layout="FIRST">
<first_editor relative-caret-position="512">
<caret line="347" column="23" lean-forward="false" selection-start-line="347" selection-start-column="23" selection-end-line="347" selection-end-column="23" />
<first_editor relative-caret-position="345">
<caret line="352" column="68" lean-forward="false" selection-start-line="352" selection-start-column="68" selection-end-line="352" selection-end-column="68" />
<folding />
</first_editor>
<second_editor />
@ -1073,13 +1081,18 @@
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="535">
<caret line="935" column="0" lean-forward="true" selection-start-line="935" selection-start-column="0" selection-end-line="935" selection-end-column="0" />
<state relative-caret-position="-320">
<caret line="972" column="29" lean-forward="true" selection-start-line="972" selection-start-column="29" selection-end-line="972" selection-end-column="29" />
<folding>
<element signature="imports" expanded="true" />
<element signature="e#45463#45830#0" expanded="true" />
<element signature="e#45751#45773#0" expanded="true" />
<element signature="e#45829#45830#0" expanded="true" />
<element signature="e#43845#45127#0" expanded="true" />
<element signature="e#44185#44197#0" expanded="true" />
<element signature="e#44739#44760#0" expanded="true" />
<element signature="e#45126#45127#0" expanded="true" />
<element signature="e#45846#48382#0" expanded="true" />
<element signature="e#46138#46160#0" expanded="true" />
<element signature="e#46333#46357#0" expanded="true" />
<element signature="e#48381#48382#0" expanded="true" />
</folding>
</state>
</provider>

@ -348,8 +348,85 @@ Map<String,Map<String,Long>> dateHourCount,日期作为Key时间和数量作
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;想将按照需求过滤后的数据和按照时间过滤后的数据进行Join操作得到完整的数据也就是每一次点击的
行为还有用户的特征然后获取点击、下单和支付的品类Id注意这里需要去重然后分别计算点击、下单和支付品类的各个Id和次数将上一次得到的品类id和这三步
相LeftOuterJoin最后的得到一个RDD这个RDD进行map后放入我们自定义的二次排序类然后将数据后批量插入到数据库。
### 获取
### 获取每一个热门品类的Top10Session
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;根据上面获取Top10的品类Id然后根据以往筛选的数据计算每一个用户对于品类的点击次数然后和
Top10的数据相Join然后计算每一个品类的点击次数在根据CategoryId进行分组拿到TopN的session数据插入数据库。
## 用户访问Session的比较高端技术
### 自定义Accumulator
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;使用自定义Accumulator降低维护成本一个就可以搞定很多业务需求
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;使用自定义Accumulator降低维护成本一个就可以搞定很多业务需求
## 性能调优篇
### 性能调优之在实际项目中分配更多资源
性能调优的王道增加和分配更多的资源性能和速度上的调优是显而易见的基本上在一定范围内增加资源与性能的提升是成正比的写完一个spark作业以后
,进行性能调优。
1.分配那些资源?
executor CPU per executor memory per executor
2.在哪里分配这些资源? 提交shell脚本的时候
3.怎么调优,以及调优的原则?
第一种spark standalone公司集群上搭建一套spark集群技术心里应该清楚。
第二种yarn资源调度。应该去查看你的spark作业要提交到资源队列技术大概有多少资源
一个原则,你能使用的资源能有多大,就尽量调节到最大。
4.为什么多分配这些资源以后我性能会得到提升?
### 性能调优之Spark并行度
Spark并行度其实就是指的是spark作业中各个stage的task数量也就代表了spark作业的各个阶段的并行度。
如果不调节,并行度,导致并行度过低,会怎么样?
你的资源虽然分配足够了,但是问题是并行度没有与资源想匹配,导致你的资源分配浪费。
合理的并行度的设置,应该是要设置的足够大,大到可以完全合理的利用你的集群资源。
1.task数量至少设置成与spark application的总CPU core数量相同
2.官方推荐task数量设置成为spark application总CPU core数量的2-3倍
实际情况与理想情况不同的有些task会运行快一点有些task可能会慢一点如果你的task设置的和CPU core数量相同可能会导致资源浪费。
3.如何设置一个 spark application的并行度 spark.default.parallelism参数在conf中设置。
### 性能调优之RDD重构和持久化
1.默认情况下多次对一个RDD执行算子去获取不同的RDD都会对这个RDD以及以前的父RDD全部重新计算一次。
对于这种情况是一定要避免的一旦出现一个RDD重复计算就会导致性能急剧降低。
2.RDD架构重构优化
尽量去复用RDD差不多的RDD可以抽取成为一个共同的RDD供后面的RDD计算反复使用。
公共的RDD一定要实现持久化。持久化也就是说将RDD的数据缓存到内存或者磁盘中之后无论进行多少次计算都直接取这个RDD的持久化的数据。
持久化,是可以进行序列化的。如果正常将持久化在内存,那么可能会导致内存的占用过大,这样的话,会知道内存溢出。
当内存无法支持公共RDD数量完全存放的时候就优先考虑使用序列化的方式在存内存存储。序列化的方式唯一的缺点是在获取数据的时候需要到反序列化。
如果序列化纯内存,只能内存+磁盘的序列化方式。
为了数据的高可靠性,而且内存充足可以使用双副本,进行持久化。持久化的双副本机制,因为机器宕机了,副本就丢了,需要重复机制,但是这样是针对你的资源很充分。
### 性能调优之在实际项目中广播大变量
默认情况下task执行的算子中使用外部的变量每个task都会获取一份变量有什么缺点在什么情况下会出现性能上的优劣影响
每一个task出现一份变量极其消耗内存有可能导致堆内存不足频繁GC以及RDD持久化部分写入到磁盘从而导致磁盘IO的消耗等。
如何解决上述性能影响呢?
广播变量。广播变量在driver上会有一份初始的副本第一个executor都有一个BlockManager负责管理某个内存和磁盘上的数据
这个会在driver上拉去相应的广播变量有可能会从远层的driver上获取变量副本也有可能从距离比较近的其他节点获取。
广播变量的好处不是每一个task一个变量副本而是每一个executor一个变量副本这样减少网络传输数据也给减少了内存存储。
### 性能优化之在实际项目中使用Kryo序列化
默认情况下spark内部使用java的序列化机器objectOutPutStream/objectInPutStream对象输入输出流机制
通过这种机制序列化,这种默认的序列化机制好处在于不必手动,但是缺点在于效率不高,占用内存比较大,。
Kryo序列化机制速度快乐序列化之后数据更小大概是java序列化机制的1/10在序列化之后可以让网络传输的数据变小内存资源也变小。
kyro序列化机制:
1.算子函数中使用的外部变量
2.持久化RDD进行序列化
3.shuffle过程

@ -19,6 +19,7 @@ import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.*;
@ -55,10 +56,14 @@ public class UserVisitAnalyze {
//获取指定范围内的Sesssion
JavaRDD<Row> sessionRangeDate=getActionRDD(sc,jsonObject);
//这里增加一个新的方法,主要是映射
JavaPairRDD<String,Row> sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
//重复用到的RDD进行持久化
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
//上面的两个RDD是
//按照Sesson进行聚合
JavaPairRDD<String,String> sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionRangeDate);
JavaPairRDD<String,String> sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
//通过条件对RDD进行筛选
// 重构,同时统计
@ -67,9 +72,13 @@ public class UserVisitAnalyze {
//在进行accumulator之前需要aciton动作不然会为空
JavaPairRDD<String,String> filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
//重复用到的RDD进行持久化
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
//获取符合过滤条件的全信息公共RDD
JavaPairRDD<String, Row> commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
//重复用到的RDD进行持久化
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
//session聚合统计统计出访问时长和访问步长的各个区间所占的比例
/**
*
@ -102,7 +111,7 @@ public class UserVisitAnalyze {
//获取热门品类数据Top10
List<Tuple2<CategorySortKey,String>> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
//获取热门每一个品类点击Top10session
getTop10Session(context,taskId,commonFullClickInfoRDD,top10CategoryIds);
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
//关闭spark上下文
context.close();
}
@ -167,23 +176,26 @@ public class UserVisitAnalyze {
/**
* session
* @param sc
* @param sessionRangeDate
* @param sessionInfoPairRDD
* @return
*/
private static JavaPairRDD<String,String> aggregateBySessionId(SQLContext sc, JavaRDD<Row> sessionRangeDate) {
private static JavaPairRDD<String,String> aggregateBySessionId(SQLContext sc, JavaPairRDD<String, Row> sessionInfoPairRDD) {
/**
* map
*/
/**
*
JavaPairRDD<String,Row> sessionActionPair=sessionRangeDate.mapToPair(new PairFunction<Row, String,Row>() {
@Override
public Tuple2<String, Row> call(Row row) throws Exception {
return new Tuple2<String, Row>(row.getString(2),row);
}
});
});*/
/**
* sessionId
*/
JavaPairRDD<String,Iterable<Row>> sessionActionGrouped=sessionActionPair.groupByKey();
JavaPairRDD<String,Iterable<Row>> sessionActionGrouped=sessionInfoPairRDD.groupByKey();
JavaPairRDD<Long,String> sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
@Override
@ -875,7 +887,7 @@ public class UserVisitAnalyze {
//获取每一个品类的Session Top10
private static void getTop10Session(JavaSparkContext sc, Long taskId, JavaPairRDD<String, Row> commonFullClickInfoRDD, List<Tuple2<CategorySortKey, String>> top10CategoryIds) {
private static void getTop10Session(JavaSparkContext sc, final Long taskId, JavaPairRDD<String, Row> sessionInfoPairRDD, List<Tuple2<CategorySortKey, String>> top10CategoryIds) {
List<Tuple2<Long,Long>> categoryIdList=new ArrayList<Tuple2<Long, Long>>();
for(Tuple2<CategorySortKey, String> top10CategoryId:top10CategoryIds)
{
@ -886,7 +898,7 @@ public class UserVisitAnalyze {
//生成一份RDD
JavaPairRDD<Long,Long> top10CategoryIdsRDD=sc.parallelizePairs(categoryIdList);
//按照SessionId进行分组
JavaPairRDD<String,Iterable<Row>> gourpBySessionIdRDD=commonFullClickInfoRDD.groupByKey();
JavaPairRDD<String,Iterable<Row>> gourpBySessionIdRDD=sessionInfoPairRDD.groupByKey();
//计算每一个session对品类的点击次数
JavaPairRDD<Long,String> categorySessionCountRDD=gourpBySessionIdRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
@Override
@ -929,12 +941,85 @@ public class UserVisitAnalyze {
//根据品类分组
JavaPairRDD<Long,Iterable<String>> top10CategorySessionCountGroupRDD=top10CategorySessionCountRDD.groupByKey();
JavaPairRDD<Long,String> top10CategorySessionRDD=top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Iterable<String>>, Long,String>() {
JavaPairRDD<String,String> top10CategorySessionRDD=top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Iterable<String>>, String,String>() {
@Override
public Iterable<Tuple2<Long, String>> call(Tuple2<Long, Iterable<String>> tuple2) throws Exception {
public Iterable<Tuple2<String, String>> call(Tuple2<Long, Iterable<String>> tuple2) throws Exception {
List<Top10CategorySession> top10CategorySessionList=new ArrayList<Top10CategorySession>();
Long categoryId=tuple2._1;
String[] top10Sessions=new String[10];
List<Tuple2<String,String>> sessionIdList=new ArrayList<Tuple2<String, String>>();
for (String sessionCount:tuple2._2)
{
String[] sessionCountSplited=sessionCount.split(",");
//String sessionId=sessionCountSplited[0];
Long count=Long.valueOf(sessionCountSplited[1]);
//获取TopN
for(int i=0;i<top10Sessions.length;i++)
{
if(top10Sessions[i]==null)
{
top10Sessions[i]=sessionCount;
}
else
{
Long _count=Long.valueOf(top10Sessions[i].split(",")[1]);
if(count>_count)
{
for (int j = 9; j>i ; j--) {
top10Sessions[j]=top10Sessions[j-1];
}
top10Sessions[i]=sessionCount;
break;
}
}
}
}
//封装数据
for (int i=0;i<top10Sessions.length;i++)
{
if(top10Sessions[i]!=null)
{
Top10CategorySession top10CategorySession=new Top10CategorySession();
String sessionId=top10Sessions[i].split(",")[0];
Long count=Long.valueOf(top10Sessions[i].split(",")[1]);
top10CategorySession.set(taskId,categoryId,sessionId,count);
top10CategorySessionList.add(top10CategorySession);
sessionIdList.add(new Tuple2<String, String>(sessionId,sessionId));
}
}
//批量插入数据库
DaoFactory.getTop10CategorySessionDao().batchInsert(top10CategorySessionList);
return sessionIdList;
}
});
return null;
//3. 获取session的明细数据保存到数据库
JavaPairRDD<String,Tuple2<String,Row>> sessionDetailRDD= top10CategorySessionRDD.join(sessionInfoPairRDD);
sessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
@Override
public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> tuple2Iterator) throws Exception {
List<SessionDetail> sessionDetailList=new ArrayList<SessionDetail>();
while(tuple2Iterator.hasNext())
{
Tuple2<String, Tuple2<String, Row>> tuple2=tuple2Iterator.next();
Row row=tuple2._2._2;
String sessionId=tuple2._1;
Long userId=row.getLong(1);
Long pageId=row.getLong(3);
String actionTime=row.getString(4);
String searchKeyWard=row.getString(5);
Long clickCategoryId=row.getLong(6);
Long clickProducetId=row.getLong(7);
String orderCategoryId=row.getString(8);
String orderProducetId=row.getString(9);
String payCategoryId=row.getString(10);
String payProducetId=row.getString(11);
SessionDetail sessionDetail=new SessionDetail();
sessionDetail.set(taskId,userId,sessionId,pageId,actionTime,searchKeyWard,clickCategoryId,clickProducetId,orderCategoryId,orderProducetId,payCategoryId,payProducetId);
sessionDetailList.add(sessionDetail);
}
DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
}
});
}

Loading…
Cancel
Save