From 7ef91d76cfdbda8acb3b17f33f841d130f298f75 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=8A=9D=E9=85=92=E5=8D=83=E7=99=BE?= <3489241516@qq.com>
Date: Tue, 17 Dec 2024 01:03:37 +0800
Subject: [PATCH] =?UTF-8?q?1.=E7=8E=AF=E5=A2=83=E6=90=AD=E5=BB=BA=E5=AE=8C?=
=?UTF-8?q?=E6=88=90=202.=E7=9B=B8=E5=85=B3=E7=9A=84=E5=B7=A5=E5=85=B7?=
=?UTF-8?q?=E7=B1=BB=E7=BC=96=E5=86=99=E5=AE=8C=E6=88=90=203.=E9=85=8D?=
=?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E7=AE=A1=E7=90=86=E7=B1=BB=E7=BC=96?=
=?UTF-8?q?=E5=86=99=E5=AE=8C=E6=88=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/workspace.xml | 19 +-
.../cn/edu/hust/session/UserVisitAnalyze.java | 1524 ++++++++++++-----
2 files changed, 1121 insertions(+), 422 deletions(-)
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index ad194c8..f2b3985 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,7 +4,7 @@
-
+
@@ -478,7 +478,8 @@
-
+
+
1529592741848
@@ -598,7 +599,15 @@
1734363388179
-
+
+
+ 1734365844392
+
+
+
+ 1734365844392
+
+
@@ -721,10 +730,10 @@
-
-
+
+
diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
index 1d62b3b..95ecb8c 100644
--- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
+++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
@@ -956,214 +956,482 @@ public class UserVisitAnalyze {
// 之后针对每个小时,创建或获取对应的随机索引列表,通过随机函数生成指定数量(extractSize)的唯一随机索引并添加到列表中。
// 最终目的是构建好dateRandomExtractMap这个数据结构,为后续依据这些随机索引从原始会话数据中真正抽取相应的会话数据提供依据,实现按照一定规则随机抽取会话数据的功能,以便进行例如抽样分析等相关业务操作。
-
- //2.将上面计算的RDD进行分组,然后使用FlatMap进行压平,然后判断是否在索引中,如果在,那么将这个信息持久化
- JavaPairRDD> time2GroupRDD=mapDataRDD.groupByKey();
- //将抽取的信息持久化到数据库,并返回SessionIds对,然后和以前的信息Join
- JavaPairRDD sessionIds= time2GroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() {
+// 2.将上面计算的RDD进行分组,然后使用FlatMap进行压平,然后判断是否在索引中,如果在,那么将这个信息持久化
+ JavaPairRDD> time2GroupRDD = mapDataRDD.groupByKey();
+// 调用mapDataRDD(应该是之前已经存在且经过一定处理的JavaPairRDD)的groupByKey方法进行分组操作。
+// 按照元素的键进行分组,将具有相同键的元素的对应值汇聚在一起,形成新的键值对存储在time2GroupRDD中,
+// 新的JavaPairRDD中键的类型为String,值的类型为Iterable,表示对应键的多个值组成的可迭代集合。
+ // 将抽取的信息持久化到数据库,并返回SessionIds对,然后和以前的信息Join
+ JavaPairRDD sessionIds = time2GroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() {
+ // 调用time2GroupRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
+ // 用于定义将输入的键值对(这里输入类型是Tuple2>,即前面分组后的结果类型)
+ // 转换为零个、一个或多个新的键值对(输出类型是Tuple2,也就是新的JavaPairRDD的键值对类型)的逻辑。
@Override
public Iterable> call(Tuple2> tuple2) throws Exception {
- String dateStr=tuple2._1;
- String date=dateStr.split("_")[0];
- String hour=dateStr.split("_")[1];
- //使用一个List存储sessionId
- List> sessionIds=new ArrayList>();
- List indexList=dateRandomExtractMap.get(date).get(hour);
- //使用一个list保存需要持久化到数据库的对象
- List sessionRandomExtractList=new ArrayList();
- int index=0;
- for (String infos:tuple2._2) {
- if(indexList.contains(Long.valueOf(index)))
- {
- //构建SessionRandomExtract
- SessionRandomExtract sessionRandomExtract=new SessionRandomExtract();
- final String sessionId=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SESSIONID);
- String startTime=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_START_TIME);
- String searchKeyWards=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SERACH_KEYWORDS);
- String clickCategoryIds=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_CLICK_CATEGORYIDS);
+ // 当执行flatMapToPair操作时,会针对time2GroupRDD中的每一个元素(即每个键值对)调用这个call方法,
+ // 参数tuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2>。
+ String dateStr = tuple2._1;
+ // 从传入的键值对tuple2中获取其键(Key)部分,并将其赋值给dateStr变量,
+ // 从后续代码对dateStr的处理来看,这个键应该是按照特定格式组织的字符串,可能包含日期和时间等相关信息。
+ String date = dateStr.split("_")[0];
+ // 调用split方法,以“_”作为分隔符,对dateStr字符串进行拆分操作,取拆分后得到的第一个子字符串,
+ // 并将其赋值给date变量,按照推测,这个部分应该是代表日期的信息,具体格式依实际业务而定。
+ String hour = dateStr.split("_")[1];
+ // 同样以“_”作为分隔符拆分dateStr,取拆分后的第二个子字符串,赋值给hour变量,
+ // 通常其代表的是小时相关的内容,具体含义要结合业务场景判断。通过这样的拆分操作,后续可依据日期和小时查找对应索引等信息来进一步处理数据。
+ // 使用一个List存储sessionId
+ List> sessionIds = new ArrayList>();
+ // 创建一个名为sessionIds的ArrayList列表,用于存储特定格式的键值对,其键和值的类型都是String,
+ // 主要用来存放与sessionId相关的键值对,方便后续和其他数据集基于sessionId进行关联等操作,提前整理好对应的数据格式进行存储。
+ List indexList = dateRandomExtractMap.get(date).get(hour);
+ // 这里涉及到dateRandomExtractMap这个变量,推测它是类似嵌套的映射结构(可能是Map>>类型),
+ // 外层的键对应日期,内层的键对应小时,值是由Long类型元素组成的列表。
+ // 首先通过dateRandomExtractMap.get(date)依据前面获取到的date(代表日期的字符串)查找对应日期下的内层映射,
+ // 再通过.get(hour)根据hour(代表小时的字符串)从内层映射中获取对应小时下存储的Long类型元素组成的列表,此列表就是indexList,
+ // 后续将用它作为判断当前处理的数据中哪些元素需要进行持久化等操作的依据。
+ // 使用一个list保存需要持久化到数据库的对象
+ List sessionRandomExtractList = new ArrayList();
+ // 创建一个名为sessionRandomExtractList的ArrayList列表,用于存放SessionRandomExtract类型的对象。
+ // SessionRandomExtract是自定义的实体类,用于封装从输入数据中提取出来、准备持久化到数据库的业务相关数据。
+ // 后续会把符合特定条件的数据封装成该类的对象,并添加到这个列表中,虽当前代码没展示具体持久化数据库的操作,但整体是为后续做准备。
+ int index = 0;
+ // 初始化一个名为index的整型变量,并将其初始值设为0,用于在后续遍历tuple2的值(Iterable部分)中的每个元素时,
+ // 作为元素的索引计数,方便和前面获取到的indexList中的索引值进行比较,以判断每个元素是否需要进行相应处理操作。
+ for (String infos : tuple2._2) {
+ // 开始一个for循环,用于遍历tuple2键值对中的值部分(即tuple2._2,其类型是Iterable),
+ // 在每次循环中,会取出其中一个字符串元素赋值给infos变量,后续就在循环体内部对每个infos字符串进行相关解析和处理操作。
+ if (indexList.contains(Long.valueOf(index))) {
+ // 在循环体中,先将当前的index(正在遍历的元素在tuple2._2中的索引位置)转换为Long类型(因为indexList是Long类型的列表),
+ // 然后通过contains方法判断这个索引值是否在indexList中。若在其中,意味着当前遍历到的infos对应的元素需进行后续特定处理,
+ // 如下文构建对象、提取信息并持久化等操作。
+ // 构建SessionRandomExtract
+ SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
+ // 创建一个SessionRandomExtract类型的对象sessionRandomExtract,它用于封装从infos字符串中提取的业务相关信息,
+ // 后续会对其各个属性进行赋值操作,再将其添加到sessionRandomExtractList列表中,最终实现将业务信息持久化到数据库的目的。
+ final String sessionId = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SESSIONID);
+ // 通过调用StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 从infos字符串中按照“|”作为分隔符提取出对应Constants.FIELD_SESSIONID这个字段对应的内容,
+ // Constants.FIELD_SESSIONID应是在Constants类中定义好的常量,用于标识sessionId在字符串中的位置等信息,
+ // 提取出的内容赋值给sessionId变量。
+ String startTime = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_START_TIME);
+ // 同理,从infos字符串中提取出代表开始时间相关字段内容,赋值给startTime变量,用于后续给sessionRandomExtract对象设置属性值。
+ String searchKeyWards = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SERACH_KEYWORDS);
+ // 从infos字符串中提取出代表搜索关键词相关字段内容,赋值给searchKeyWards变量,为后续设置对象属性做准备。
+ String clickCategoryIds = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_CLICK_CATEGORYIDS);
+ // 从infos字符串中提取出代表点击类别IDs相关字段内容,赋值给clickCategoryIds变量,以便后续设置对象属性。
sessionRandomExtract.set(taskId,sessionId,startTime,searchKeyWards,clickCategoryIds);
- //添加到List中然后持久化到数据库中
+ // 调用sessionRandomExtract对象的set方法(假设SessionRandomExtract类有此方法用于设置各属性值),
+ // 将前面提取出来的各个业务相关信息以及外部定义好的taskId(与当前任务相关的标识,外部已赋值确定)设置到sessionRandomExtract对象中,
+ // 完成对该对象的属性赋值操作,使其封装好要持久化的数据信息。
+ // 添加到List中然后持久化到数据库中
sessionRandomExtractList.add(sessionRandomExtract);
+ // 将已经封装好业务信息的sessionRandomExtract对象添加到sessionRandomExtractList列表中,
+ // 后续应会有其他代码利用这个列表中的对象进行数据库持久化操作,将这些对象代表的数据插入数据库相应表中存储。
sessionIds.add(new Tuple2(sessionId,sessionId));
+ // 创建一个键值对,其键和值都是当前提取出来的sessionId,然后将这个键值对添加到前面创建的sessionIds列表中,
+ // 这样做可能是为了后续和其他数据集基于sessionId进行关联操作(如Join操作)时方便整理出对应的关联数据对,便于后续整合和分析数据。
}
index++;
+ // 在每次循环结束后,将index变量的值自增1,使其指向下一个元素在tuple2._2中的索引位置,
+ // 以便在下一轮循环中正确判断对应元素是否在indexList中,从而继续对下一个元素进行相应处理操作。
}
- //持久化到数据库
- DaoFactory.getSessionRandomExtractDao().batchInsert(sessionRandomExtractList);
return sessionIds;
+ // 返回包含sessionId键值对的列表sessionIds,这是flatMapToPair方法要求的返回类型,
+ // 最终这些返回的键值对会构成新的JavaPairRDD(即前面定义的sessionIds变量所代表的RDD)。
}
});
-
- //3. 获取session的明细数据保存到数据库
- JavaPairRDD> sessionDetailRDD= sessionIds.join(sessionInfoPairRDD);
- sessionDetailRDD.foreachPartition(new VoidFunction>>>() {
- @Override
- public void call(Iterator>> tuple2Iterator) throws Exception {
- List sessionDetailList=new ArrayList();
- while(tuple2Iterator.hasNext())
- {
- Tuple2> tuple2=tuple2Iterator.next();
- Row row=tuple2._2._2;
- String sessionId=tuple2._1;
- Long userId=row.getLong(1);
- Long pageId=row.getLong(3);
- String actionTime=row.getString(4);
- String searchKeyWard=row.getString(5);
- Long clickCategoryId=row.getLong(6);
- Long clickProducetId=row.getLong(7);
- String orderCategoryId=row.getString(8);
- String orderProducetId=row.getString(9);
- String payCategoryId=row.getString(10);
- String payProducetId=row.getString(11);
- SessionDetail sessionDetail=new SessionDetail();
- sessionDetail.set(taskId,userId,sessionId,pageId,actionTime,searchKeyWard,clickCategoryId,clickProducetId,orderCategoryId,orderProducetId,payCategoryId,payProducetId);
- sessionDetailList.add(sessionDetail);
- }
- DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
- }
- });
-
}
//计算各个范围的占比,并持久化到数据库
- private static void calculateAndPersist(String value,Long taskId) {
+ private static void calculateAndPersist(String value, Long taskId) {
+ // 此方法用于根据传入的字符串value和任务ID taskId进行相关数据的计算,
+ // 并将计算结果封装成对象后持久化到数据库中,整体是数据处理及存储相关的逻辑。
+
//System.out.println(value);
- Long sessionCount=Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));
- //各个范围的访问时长
- Double visit_Length_1s_3s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));
- Double visit_Length_4s_6s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));
- Double visit_Length_7s_9s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));
- Double visit_Length_10s_30s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));
- Double visit_Length_30s_60s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));
- Double visit_Length_1m_3m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));
- Double visit_Length_3m_10m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));
- Double visit_Length_10m_30m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));
- Double visit_Length_30m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));
-
- //各个范围的访问步长
- Double step_Length_1_3=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));
- Double step_Length_4_6=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));
- Double step_Length_7_9=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));
- Double step_Length_10_30=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));
- Double step_Length_30_60=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));
- Double step_Length_60=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));
-
- //访问时长对应的sesison占比,保留3位小数
- double visit_Length_1s_3s_ratio=NumberUtils.formatDouble(visit_Length_1s_3s/sessionCount,3);
- double visit_Length_4s_6s_ratio=NumberUtils.formatDouble(visit_Length_4s_6s/sessionCount,3);
- double visit_Length_7s_9s_ratio=NumberUtils.formatDouble(visit_Length_7s_9s/sessionCount,3);
- double visit_Length_10s_30s_ratio=NumberUtils.formatDouble(visit_Length_10s_30s/sessionCount,3);
- double visit_Length_30s_60s_ratio=NumberUtils.formatDouble(visit_Length_30s_60s/sessionCount,3);
- double visit_Length_1m_3m_ratio=NumberUtils.formatDouble(visit_Length_1m_3m/sessionCount,3);
- double visit_Length_3m_10m_ratio=NumberUtils.formatDouble(visit_Length_3m_10m/sessionCount,3);
- double visit_Length_10m_30m_ratio=NumberUtils.formatDouble(visit_Length_10m_30m/sessionCount,3);
- double visit_Length_30m_ratio=NumberUtils.formatDouble(visit_Length_30m/sessionCount,3);
-
- //访问步长对应的session占比,保留3位小数
- double step_Length_1_3_ratio= NumberUtils.formatDouble(step_Length_1_3/sessionCount,3);
- double step_Length_4_6_ratio=NumberUtils.formatDouble(step_Length_4_6/sessionCount,3);
- double step_Length_7_9_ratio=NumberUtils.formatDouble(step_Length_7_9/sessionCount,3);
- double c=NumberUtils.formatDouble(step_Length_10_30/sessionCount,3);
- double step_Length_30_60_ratio=NumberUtils.formatDouble(step_Length_30_60/sessionCount,3);
- double step_Length_60_ratio=NumberUtils.formatDouble(step_Length_60/sessionCount,3);
-
- SessionAggrStat sessionAggrStat=new SessionAggrStat();
- sessionAggrStat.set(taskId,sessionCount,visit_Length_1s_3s_ratio,visit_Length_4s_6s_ratio,
- visit_Length_7s_9s_ratio,visit_Length_10s_30s_ratio,visit_Length_30s_60s_ratio,
- visit_Length_1m_3m_ratio,visit_Length_3m_10m_ratio,visit_Length_10m_30m_ratio,visit_Length_30m_ratio
- ,step_Length_1_3_ratio,step_Length_4_6_ratio,step_Length_7_9_ratio,step_Length_7_9_ratio,step_Length_30_60_ratio,step_Length_60_ratio);
- List sessionAggrStatList=new ArrayList();
+ // 这行代码被注释掉了,原本可能是用于调试输出value的值,查看传入的字符串具体内容。
+
+ Long sessionCount = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从传入的value字符串中提取出对应Constants.SESSION_COUNT(应该是在Constants类中定义好的常量,
+ // 用于标识sessionCount在字符串中的位置等信息)这个字段对应的内容,然后将其转换为Long类型,赋值给sessionCount变量,
+ // 该变量用于记录会话数量相关信息。
+
+ // 各个范围的访问时长
+ Double visit_Length_1s_3s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));
+ // 同样使用StringUtils工具类按照“|”分隔符从value字符串中提取出对应Constants.TIME_PERIOD_1s_3s字段对应的内容,
+ // 并转换为Double类型,赋值给visit_Length_1s_3s变量,用于记录访问时长在1秒到3秒这个范围的相关数据。
+
+ Double visit_Length_4s_6s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));
+ // 从value字符串中提取对应Constants.TIME_PERIOD_4s_6s字段内容并转换为Double类型,赋值给visit_Length_4s_6s变量,
+ // 用于记录访问时长在4秒到6秒范围的数据。
+
+ Double visit_Length_7s_9s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));
+ // 提取value字符串中对应Constants.TIME_PERIOD_7s_9s字段内容转换为Double类型,赋值给visit_Length_7s_9s变量,
+ // 代表访问时长在7秒到9秒范围的数据。
+
+ Double visit_Length_10s_30s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));
+ // 按指定分隔符从value字符串中提取对应字段内容并转换为Double类型,赋值给visit_Length_10s_30s变量,
+ // 用于记录访问时长在10秒到30秒范围的数据。
+
+ Double visit_Length_30s_60s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));
+ // 从value字符串提取对应字段内容转换为Double类型后赋值给visit_Length_30s_60s变量,
+ // 表示访问时长在30秒到60秒范围的数据。
+
+ Double visit_Length_1m_3m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));
+ // 提取value字符串中对应Constants.TIME_PERIOD_1m_3m字段内容转换为Double类型,赋值给visit_Length_1m_3m变量,
+ // 用于记录访问时长在1分钟到3分钟范围的数据(这里的“m”应该表示分钟,结合业务场景理解时长范围)。
+
+ Double visit_Length_3m_10m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));
+ // 按照分隔符从value字符串获取对应字段内容转换为Double类型,赋值给visit_Length_3m_10m变量,
+ // 代表访问时长在3分钟到10分钟范围的数据。
+
+ Double visit_Length_10m_30m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));
+ // 从value字符串提取对应字段内容转换为Double类型后赋值给visit_Length_10m_30m变量,
+ // 用于记录访问时长在10分钟到30分钟范围的数据。
+
+ Double visit_Length_30m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));
+ // 提取value字符串中对应Constants.TIME_PERIOD_30m字段内容转换为Double类型,赋值给visit_Length_30m变量,
+ // 表示访问时长大于30分钟的数据(这里根据变量名及业务逻辑推测其含义)。
+
+ // 各个范围的访问步长
+ Double step_Length_1_3 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));
+ // 使用StringUtils工具类从value字符串中提取对应Constants.STEP_PERIOD_1_3字段内容并转换为Double类型,
+ // 赋值给step_Length_1_3变量,用于记录访问步长在1到3这个范围的数据(具体单位等需结合业务确定)。
+
+ Double step_Length_4_6 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));
+ // 从value字符串提取对应Constants.STEP_PERIOD_4_6字段内容转换为Double类型,赋值给step_Length_4_6变量,
+ // 代表访问步长在4到6范围的数据。
+
+ Double step_Length_7_9 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));
+ // 按照分隔符从value字符串中获取对应字段内容转换为Double类型,赋值给step_Length_7_9变量,
+ // 用于记录访问步长在7到9范围的数据。
+
+ Double step_Length_10_30 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));
+ // 提取value字符串中对应Constants.STEP_PERIOD_10_30字段内容转换为Double类型,赋值给step_Length_10_30变量,
+ // 表示访问步长在10到30范围的数据。
+
+ Double step_Length_30_60 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));
+ // 从value字符串获取对应字段内容转换为Double类型后赋值给step_Length_30_60变量,
+ // 用于记录访问步长在30到60范围的数据。
+
+ Double step_Length_60 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));
+ // 提取value字符串中对应Constants.STEP_PERIOD_60字段内容转换为Double类型,赋值给step_Length_60变量,
+ // 代表访问步长大于60的数据(根据变量名及业务场景推测其含义)。
+
+ // 访问时长对应的sesison占比,保留3位小数
+ double visit_Length_1s_3s_ratio = NumberUtils.formatDouble(visit_Length_1s_3s / sessionCount, 3);
+ // 先计算访问时长在1秒到3秒范围的数据(visit_Length_1s_3s)与会话数量(sessionCount)的比值,
+ // 然后通过NumberUtils工具类(自定义的用于数字处理的工具类,推测有格式化数字的功能)的formatDouble方法,
+ // 将该比值保留3位小数,结果赋值给visit_Length_1s_3s_ratio变量,用于表示此访问时长范围对应的会话占比情况。
+
+ double visit_Length_4s_6s_ratio = NumberUtils.formatDouble(visit_Length_4s_6s / sessionCount, 3);
+ // 计算访问时长在4秒到6秒范围的数据与会话数量的比值,并使用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_4s_6s_ratio变量,代表该时长范围对应的会话占比。
+
+ double visit_Length_7s_9s_ratio = NumberUtils.formatDouble(visit_Length_7s_9s / sessionCount, 3);
+ // 计算访问时长在7秒到9秒范围的数据和会话数量的比值,经NumberUtils工具类格式化保留3位小数后,
+ // 赋值给visit_Length_7s_9s_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_10s_30s_ratio = NumberUtils.formatDouble(visit_Length_10s_30s / sessionCount, 3);
+ // 计算访问时长在10秒到30秒范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_10s_30s_ratio变量,表示该时长范围对应的会话占比。
+
+ double visit_Length_30s_60s_ratio = NumberUtils.formatDouble(visit_Length_30s_60s / sessionCount, 3);
+ // 计算访问时长在30秒到60秒范围的数据和会话数量的比值,利用NumberUtils工具类保留3位小数后,
+ // 赋值给visit_Length_30s_60s_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_1m_3m_ratio = NumberUtils.formatDouble(visit_Length_1m_3m / sessionCount, 3);
+ // 计算访问时长在1分钟到3分钟范围的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数,
+ // 赋值给visit_Length_1m_3m_ratio变量,代表该时长范围对应的会话占比。
+
+ double visit_Length_3m_10m_ratio = NumberUtils.formatDouble(visit_Length_3m_10m / sessionCount, 3);
+ // 计算访问时长在3分钟到10分钟范围的数据和会话数量的比值,使用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_3m_10m_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_10m_30m_ratio = NumberUtils.formatDouble(visit_Length_10m_30m / sessionCount, 3);
+ // 计算访问时长在10分钟到30分钟范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_10m_30m_ratio变量,表示该时长范围对应的会话占比。
+
+ double visit_Length_30m_ratio = NumberUtils.formatDouble(visit_Length_30m / sessionCount, 3);
+ // 计算访问时长大于30分钟的数据与会话数量的比值,利用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_30m_ratio变量,用于记录此情况对应的会话占比。
+
+ // 访问步长对应的session占比,保留3位小数
+ double step_Length_1_3_ratio = NumberUtils.formatDouble(step_Length_1_3 / sessionCount, 3);
+ // 计算访问步长在1到3范围的数据(step_Length_1_3)与会话数量(sessionCount)的比值,
+ // 借助NumberUtils工具类的formatDouble方法保留3位小数,赋值给step_Length_1_3_ratio变量,
+ // 用于表示该访问步长范围对应的会话占比情况。
+
+ double step_Length_4_6_ratio = NumberUtils.formatDouble(step_Length_4_6 / sessionCount, 3);
+ // 计算访问步长在4到6范围的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数后,
+ // 赋值给step_Length_4_6_ratio变量,代表该步长范围对应的会话占比。
+
+ double step_Length_7_9_ratio = NumberUtils.formatDouble(step_Length_7_9 / sessionCount, 3);
+ // 计算访问步长在7到9范围的数据和会话数量的比值,使用NumberUtils工具类保留3位小数,
+ // 赋值给step_Length_7_9_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double c = NumberUtils.formatDouble(step_Length_10_30 / sessionCount, 3);
+ // 计算访问步长在10到30范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给c变量(这里变量名可能不太符合规范,推测应该是step_Length_10_30_ratio之类更表意清晰的名称,需根据实际情况确认),
+ // 用于表示该步长范围对应的会话占比情况。
+
+ double step_Length_30_60_ratio = NumberUtils.formatDouble(step_Length_30_60 / sessionCount, 3);
+ // 计算访问步长在30到60范围的数据和会话数量的比值,利用NumberUtils工具类保留3位小数后,
+ // 赋值给step_Length_30_60_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double step_Length_60_ratio = NumberUtils.formatDouble(step_Length_60 / sessionCount, 3);
+ // 计算访问步长大于60的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数,
+ // 赋值给step_Length_60_ratio变量,代表该情况对应的会话占比。
+
+ SessionAggrStat sessionAggrStat = new SessionAggrStat();
+ // 创建一个SessionAggrStat类型的对象sessionAggrStat,此类应该是自定义的用于封装要持久化到数据库的相关数据的实体类。
+
+ sessionAggrStat.set(taskId, sessionCount, visit_Length_1s_3s_ratio, visit_Length_4s_6s_ratio,
+ visit_Length_7s_9s_ratio, visit_Length_10s_30s_ratio, visit_Length_30s_60s_ratio,
+ visit_Length_1m_3m_ratio, visit_Length_3m_10m_ratio, visit_Length_10m_30m_ratio, visit_Length_30m_ratio
+ , step_Length_1_3_ratio, step_Length_4_6_ratio, step_Length_7_9_ratio, step_Length_7_9_ratio, step_Length_30_60_ratio, step_Length_60_ratio);
+ // 调用sessionAggrStat对象的set方法(假设SessionAggrStat类有此方法用于设置对象的各个属性值),
+ // 将前面计算好的任务ID、会话数量、各访问时长及访问步长对应的会话占比等数据设置到sessionAggrStat对象中,
+ // 完成对该对象属性的赋值操作,使其封装好要持久化到数据库的全部数据信息。
+
+ List sessionAggrStatList = new ArrayList();
+ // 创建一个名为sessionAggrStatList的ArrayList列表,用于存放SessionAggrStat类型的对象,
+ // 在这里主要是为了方便后续批量插入数据库操作,将单个封装好数据的对象放入列表中进行统一处理。
+
sessionAggrStatList.add(sessionAggrStat);
+ // 将前面创建并赋值好的sessionAggrStat对象添加到sessionAggrStatList列表中,准备进行批量插入数据库操作。
+
// 插入数据库
DaoFactory.getSessionAggrStatDao().batchInsert(sessionAggrStatList);
+ // 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getSessionAggrStatDao方法,
+ // 获取到用于操作SessionAggrStat数据的DAO对象,然后调用其batchInsert方法,
+ // 将包含要持久化数据的sessionAggrStatList列表传入,实现将数据批量插入到数据库中的功能,完成整个数据持久化流程。
}
+
+
/**
* 获取top10热门品类
* @param taskId
* @param sessionId2DetailRDD
*/
- private static List> getTop10Category(Long taskId, JavaPairRDD sessionId2DetailRDD) {
- //1.第一步已抽离出方法getFilterFullInfoRDD
- //2。获取访问的品类id,访问过表示点击,下单,支付
- JavaPairRDD categoryRDD=sessionId2DetailRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ private static List> getTop10Category(Long taskId, JavaPairRDD sessionId2DetailRDD) {
+ // 此方法名为getTop10Category,作用是获取排名前10的品类相关信息(具体从返回值及后续代码逻辑推测),
+ // 接收一个任务ID(taskId)和一个JavaPairRDD类型的sessionId2DetailRDD作为参数,
+ // 下面的代码将基于这个RDD逐步提取、处理品类相关数据,这里先从获取访问的品类id开始着手处理。
+
+ // 1.第一步已抽离出方法getFilterFullInfoRDD
+ // 此处表明前面应该有一个名为getFilterFullInfoRDD的方法已经执行过了,它完成了一部分前置的数据处理操作,
+ // 不过这里并没有展示该方法具体的实现内容,但本方法是在此基础上继续后续流程的。
+
+ // 2。获取访问的品类id,访问过表示点击,下单,支付
+ JavaPairRDD categoryRDD = sessionId2DetailRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ // 调用sessionId2DetailRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例。
+ // 这个接口用于定义将输入的键值对(此处输入类型是Tuple2,也就是sessionId2DetailRDD里元素的类型)
+ // 转换为零个、一个或多个新的键值对(输出类型为Tuple2,是即将生成的新JavaPairRDD的键值对类型)的逻辑,
+ // 目的在于从原始的RDD数据中提取出与访问品类相关的信息,并整理成新的键值对形式方便后续处理。
+
@Override
public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
- Row row=stringRowTuple2._2;
- List> visitCategoryList=new ArrayList>();
- Long clickCategoryId=row.getLong(6);
- //点击品类的id
- if(clickCategoryId!=null)
- visitCategoryList.add(new Tuple2(clickCategoryId,clickCategoryId));
-
- if(row.get(8)!=null){
- String[] orderCategoryIdsSplited=row.getString(8).split(",");
- for (String orderCategoryId:
+ // 重写了PairFlatMapFunction接口中的call方法,当执行flatMapToPair操作时,针对sessionId2DetailRDD中的每一个元素(每个键值对),
+ // 都会调用这个call方法进行处理,参数stringRowTuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
+
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(是Row类型的数据),赋值给row变量。
+ // Row类型通常代表一行包含多个字段的数据,后续会从这个row中提取出与品类相关的各个字段信息,例如点击品类的id等。
+
+ List> visitCategoryList = new ArrayList>();
+ // 创建一个名为visitCategoryList的ArrayList列表,用于存放即将提取出来的与访问品类相关的键值对信息,
+ // 其键和值的类型都为Long,后续会根据不同的业务场景(点击、下单、支付)把相应的品类ID信息封装成键值对添加到这个列表中。
+
+ Long clickCategoryId = row.getLong(6);
+ // 从row数据行中获取索引为6的字段值(从代码上下文推测这个字段应该代表点击品类的ID),并将其转换为Long类型,赋值给clickCategoryId变量。
+ // 通过索引获取字段的方式表明Row对象内部的数据结构类似数组或者列表,每个索引位置对应不同的业务含义的字段,具体需要结合数据来源和定义来确定。
+
+ // 点击品类的id
+ if (clickCategoryId!= null)
+ visitCategoryList.add(new Tuple2(clickCategoryId, clickCategoryId));
+ // 判断点击品类的ID是否为空,如果不为空,说明存在点击品类相关的信息,
+ // 那么就创建一个键值对,其键和值都设置为该点击品类的ID(clickCategoryId),然后将这个键值对添加到visitCategoryList列表中,
+ // 这样做方便后续统计点击品类相关的情况以及和其他品类相关操作(如下单、支付品类信息)进行整合处理。
+
+ if (row.get(8)!= null) {
+ String[] orderCategoryIdsSplited = row.getString(8).split(",");
+ // 从row数据行中获取索引为8的字段(根据代码逻辑推测这个字段存储的是下单品类的ID集合,以逗号分隔的字符串形式存在),先判断其是否为空,
+ // 如果不为空,就通过split方法以逗号作为分隔符,将这个字符串拆分成一个字符串数组orderCategoryIdsSplited,数组中的每个元素就是一个下单品类的ID(以字符串形式呈现)。
+
+ for (String orderCategoryId :
orderCategoryIdsSplited) {
- visitCategoryList.add(new Tuple2(Long.valueOf(orderCategoryId),Long.valueOf(orderCategoryId)));
+ visitCategoryList.add(new Tuple2(Long.valueOf(orderCategoryId), Long.valueOf(orderCategoryId)));
+ // 遍历拆分后的下单品类ID字符串数组orderCategoryIdsSplited,对于其中的每个字符串形式的下单品类ID(orderCategoryId),
+ // 先将其转换为Long类型,然后创建一个键值对,键和值都设置为这个转换后的Long类型的下单品类ID,
+ // 最后将这个键值对添加到visitCategoryList列表中,用于记录下单的品类相关信息,方便后续统计和分析下单品类的情况。
}
}
- if(row.get(10)!=null){
- String[] payCategoryIdsSplited=row.getString(10).split(",");
- for (String payCategoryId:
+ if (row.get(10)!= null) {
+ String[] payCategoryIdsSplited = row.getString(10).split(",");
+ // 同样地,从row数据行中获取索引为10的字段(推测这个字段存储的是支付品类的ID集合,也是以逗号分隔的字符串形式),判断其是否为空,
+ // 若不为空,使用split方法以逗号作为分隔符将其拆分成字符串数组payCategoryIdsSplited,数组中的每个元素就是一个支付品类的ID(字符串形式)。
+
+ for (String payCategoryId :
payCategoryIdsSplited) {
- visitCategoryList.add(new Tuple2(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId)));
+ visitCategoryList.add(new Tuple2(Long.valueOf(payCategoryId), Long.valueOf(payCategoryId)));
+ // 遍历支付品类ID字符串数组payCategoryIdsSplited,针对每个字符串形式的支付品类ID(payCategoryId),
+ // 将其转换为Long类型后创建一个键值对,键和值都设置为这个转换后的Long类型的支付品类ID,
+ // 再把这个键值对添加到visitCategoryList列表中,以此记录支付的品类相关信息,便于后续对支付品类情况进行统计和处理。
}
}
return visitCategoryList;
+ // 将包含了点击、下单、支付等访问品类相关键值对信息的visitCategoryList列表返回,
+ // 这些键值对将会构成新的JavaPairRDD(也就是前面定义的categoryRDD)中的元素,用于后续进一步的数据处理操作,比如去重、统计等。
}
});
- //需要去重
- categoryRDD=categoryRDD.distinct();
- //3。计算各个品类的点击,下单和支付次数
- // 3.1 计算点击品类的数量
- JavaPairRDD clickCategoryRDD = getLClickCategoryRDD(sessionId2DetailRDD);
-
- // 3.2 计算下单的品类的数量
- JavaPairRDD orderCategoryRDD= getOrderCategoryRDD(sessionId2DetailRDD);
- // 3.3 计算支付的品类的数量
- JavaPairRDD payCategoryRDD=getPayCategoryRDD(sessionId2DetailRDD);
- //4.将上述计算的三个字段进行join,注意这里是LeftOuterJoin,因为有些品类只是点击了
- JavaPairRDD categoryCountRDD=joinCategoryAndData(categoryRDD,clickCategoryRDD,orderCategoryRDD,payCategoryRDD);
- //5.自定义二次排序的key
- JavaPairRDD sortKeyCountRDD=categoryCountRDD.mapToPair(new PairFunction, CategorySortKey, String>() {
+ // 需要去重
+ categoryRDD = categoryRDD.distinct();
+// 调用categoryRDD的distinct方法对其进行去重操作。因为之前通过flatMapToPair操作提取品类ID信息时,
+// 可能存在重复的品类ID记录,去重可以保证后续基于品类ID进行的各种统计、计算及分析等操作数据的准确性,
+// 经过去重后categoryRDD中每个品类ID将是唯一的,方便后续处理步骤使用。
+
+// 3。计算各个品类的点击,下单和支付次数
+// 3.1 计算点击品类的数量
+ JavaPairRDD clickCategoryRDD = getLClickCategoryRDD(sessionId2DetailRDD);
+// 调用名为getLClickCategoryRDD的自定义方法,传入sessionId2DetailRDD作为参数,
+// 该方法的作用应该是从sessionId2DetailRDD这个包含详细会话数据的RDD中提取出与点击品类数量相关的信息,
+// 并返回一个JavaPairRDD类型的RDD(即clickCategoryRDD),其中键值对可能表示品类ID以及对应的点击次数等信息,
+// 用于后续和其他品类相关统计数据进行整合分析。
+
+// 3.2 计算下单的品类的数量
+ JavaPairRDD orderCategoryRDD = getOrderCategoryRDD(sessionId2DetailRDD);
+// 同样地,调用自定义的getOrderCategoryRDD方法,以sessionId2DetailRDD为参数,
+// 此方法旨在从给定的RDD中提取出与下单品类数量相关的数据,返回一个JavaPairRDD类型的orderCategoryRDD,
+// 其键值对大概包含了品类ID以及对应的下单次数等内容,方便后续综合统计各个品类的不同行为次数情况。
+
+// 3.3 计算支付的品类的数量
+ JavaPairRDD payCategoryRDD = getPayCategoryRDD(sessionId2DetailRDD);
+// 调用getPayCategoryRDD这个自定义方法,输入sessionId2DetailRDD,
+// 该方法负责从传入的RDD中提取出与支付品类数量相关的信息,生成并返回JavaPairRDD类型的payCategoryRDD,
+// 其中的键值对可能体现了品类ID和对应的支付次数等信息,为后续全面分析品类相关行为数据做准备。
+
+// 4.将上述计算的三个字段进行join,注意这里是LeftOuterJoin,因为有些品类只是点击了
+ JavaPairRDD categoryCountRDD = joinCategoryAndData(categoryRDD, clickCategoryRDD, orderCategoryRDD, payCategoryRDD);
+// 调用自定义的joinCategoryAndData方法,将前面已经获取到的categoryRDD(经过去重后包含品类ID的RDD)、
+// clickCategoryRDD(点击品类数量相关的RDD)、orderCategoryRDD(下单品类数量相关的RDD)以及payCategoryRDD(支付品类数量相关的RDD)作为参数传入,
+// 进行LeftOuterJoin(左外连接)操作。左外连接的意义在于,即使某些品类只有点击行为(也就是在下单和支付相关的RDD中不存在对应记录),
+// 也能在连接结果中保留这些品类的信息,确保最终得到的categoryCountRDD包含了所有品类的综合情况(点击、下单、支付次数等信息整合在一起),
+// 其返回的JavaPairRDD类型的categoryCountRDD,键可能是品类ID,值可能是整合了点击、下单、支付次数等相关信息的字符串,方便后续进一步处理。
+
+// 5.自定义二次排序的key
+ JavaPairRDD sortKeyCountRDD = categoryCountRDD.mapToPair(new PairFunction, CategorySortKey, String>() {
+ // 调用categoryCountRDD的mapToPair方法,传入一个实现了PairFunction接口的匿名内部类实例,
+ // 这个操作的目的是基于categoryCountRDD中的元素(类型为Tuple2),按照自定义的逻辑转换生成新的键值对,
+ // 新键值对的类型是Tuple2,也就是即将生成的sortKeyCountRDD的元素类型,通过自定义键的规则来为后续排序做准备。
@Override
public Tuple2 call(Tuple2 longStringTuple2) throws Exception {
- String countInfo=longStringTuple2._2;
- Long clickCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CLICK_CATEGORY));
- Long orderCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Long payCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- CategorySortKey key=new CategorySortKey();
- key.set(clickCount,orderCount,payCount);
- return new Tuple2(key,countInfo);
+ // 重写了PairFunction接口中的call方法,当执行mapToPair操作时,会针对categoryCountRDD中的每一个元素(每个Tuple2类型的键值对),
+ // 调用这个call方法来执行具体的转换逻辑,参数longStringTuple2就是当前正在处理的那个键值对元素。
+
+ String countInfo = longStringTuple2._2;
+ // 从传入的键值对longStringTuple2中获取其值部分(是一个字符串类型的数据,从变量名countInfo推测它包含了品类相关的各种统计信息,比如点击、下单、支付次数等),
+ // 并将其赋值给countInfo变量,后续将从这个字符串中提取出具体的次数信息用于构建排序的关键对象。
+
+ Long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_CATEGORY));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从countInfo字符串中提取出对应Constants.FIELD_CLICK_CATEGORY(应该是在Constants类中定义好的常量,
+ // 用于标识点击品类次数在字符串中的位置等信息)这个字段对应的内容,然后将其转换为Long类型,赋值给clickCount变量,从而获取到点击品类的次数信息。
+
+ Long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 同样利用StringUtils工具类,从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容,将其转换为Long类型,
+ // 赋值给orderCount变量,以此获取下单品类的次数信息(此处代码中两次使用了Constants.FIELD_ORDER_CATEGORY来提取字段,可能存在错误,
+ // 正常应该分别对应不同的常量来准确提取下单次数和支付次数,需结合实际Constants类定义来确认)。
+
+ Long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 再次使用相同方式从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容并转换为Long类型,赋值给payCount变量,
+ // 获取支付品类的次数信息(同样可能存在提取字段常量使用不当的问题,需检查修正)。
+
+ CategorySortKey key = new CategorySortKey();
+ key.set(clickCount, orderCount, payCount);
+ // 创建一个CategorySortKey类型的对象key,此类应该是自定义的用于封装排序关键信息的实体类,
+ // 通过调用其set方法(假设CategorySortKey类有这样的方法用于设置对象属性值),将前面获取到的点击品类次数、下单品类次数、支付品类次数等信息设置到key对象中,
+ // 以此构建出用于排序的关键对象,后续可以根据这个对象中包含的品类次数信息来决定数据的排序规则。
+
+ return new Tuple2(key, countInfo);
+ // 返回一个新的键值对,其键为构建好的CategorySortKey类型的排序关键对象key,值为原来的countInfo字符串(包含完整的品类统计信息),
+ // 这些新生成的键值对将构成新的JavaPairRDD(即前面定义的sortKeyCountRDD),用于后续按照自定义规则进行排序操作。
}
});
- JavaPairRDD sortedCategoryRDD=sortKeyCountRDD.sortByKey(false);
- //取出前10个,写入数据库
- List> top10CategoryList=sortedCategoryRDD.take(10);
- List top10Categories=new ArrayList();
- for(Tuple2 tuple2:top10CategoryList)
- {
- String countInfo=tuple2._2;
- Long categoryId=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CATEGORY_ID));
- Long clickCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CLICK_CATEGORY));
- Long orderCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Long payCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Top10Category top10Category=new Top10Category();
- top10Category.set(taskId,categoryId,clickCount,orderCount,payCount);
+
+
+
+ JavaPairRDD sortedCategoryRDD = sortKeyCountRDD.sortByKey(false);
+// 调用sortKeyCountRDD的sortByKey方法,传入参数false,表示按照键(CategorySortKey类型)进行降序排序。
+// 这里的键(CategorySortKey)包含了之前构建的与品类相关的点击、下单、支付次数等用于排序的关键信息,
+// 通过这种排序方式,可以将品类数据按照设定的规则有序排列,使得访问量等综合情况更突出的品类排在前面,方便后续取出排名靠前的品类信息,
+// 排序后的结果存储在新的JavaPairRDD类型的sortedCategoryRDD中。
+
+// 取出前10个,写入数据库
+ List> top10CategoryList = sortedCategoryRDD.take(10);
+// 调用sortedCategoryRDD的take方法,该方法会从已经排好序的sortedCategoryRDD中取出前10个元素,
+// 每个元素是一个Tuple2类型的键值对,包含了品类相关的排序关键信息(键部分)以及完整的品类统计信息(值部分),
+// 取出的这10个元素组成的列表被赋值给top10CategoryList变量,这个列表就代表了访问量等综合情况排名前10的品类相关信息,后续将基于这些信息进行数据库持久化等操作。
+
+ List top10Categories = new ArrayList();
+// 创建一个名为top10Categories的ArrayList列表,用于存放Top10Category类型的对象。
+// Top10Category应该是自定义的用于封装要持久化到数据库的前10品类详细信息的实体类,接下来的循环操作会将提取出来的相关数据封装成此类对象添加到这个列表中,
+// 以便后续进行批量插入数据库的操作。
+
+ for (Tuple2 tuple2 : top10CategoryList) {
+ // 开始遍历top10CategoryList列表,其中每个元素tuple2就是包含了排名前10品类相关信息的键值对,通过遍历可以依次提取每个品类的详细信息进行封装处理。
+ String countInfo = tuple2._2;
+ // 从当前遍历的键值对tuple2中获取其值部分(也就是包含品类详细统计信息的字符串,从前面代码逻辑可知这个字符串包含了品类ID、点击次数等多种信息),
+ // 并将其赋值给countInfo变量,后续将从这个字符串中按照特定分隔符提取出具体的各项信息,用于构建Top10Category对象。
+
+ Long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从countInfo字符串中提取出对应Constants.FIELD_CATEGORY_ID(应该是在Constants类中定义好的常量,用于标识品类ID在字符串中的位置等信息)
+ // 这个字段对应的内容,然后将其转换为Long类型,赋值给categoryId变量,从而获取到当前品类的ID信息。
+
+ Long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_CATEGORY));
+ // 同样利用StringUtils工具类,从countInfo字符串中提取对应Constants.FIELD_CLICK_CATEGORY字段对应的内容,并转换为Long类型,
+ // 赋值给clickCount变量,以此获取该品类的点击次数信息,方便后续将这些准确的业务数据封装到对象中持久化到数据库。
+
+ Long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容并转换为Long类型,赋值给orderCount变量,
+ // 这样就获取到了该品类的下单次数信息,用于构建完整的Top10Category对象,记录品类的下单行为情况。
+
+ Long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 再次使用StringUtils工具类按照相同分隔符从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容,将其转换为Long类型,
+ // 赋值给payCount变量,获取该品类的支付次数信息(此处代码中两次使用Constants.FIELD_ORDER_CATEGORY来提取字段,可能存在错误,
+ // 正常应该分别对应不同的常量来准确提取下单次数和支付次数,需结合实际Constants类定义来确认)。
+
+ Top10Category top10Category = new Top10Category();
+ // 创建一个Top10Category类型的对象top10Category,此类是用于封装前10品类详细数据的自定义实体类,接下来将把提取到的各项品类信息设置到这个对象中。
+
+ top10Category.set(taskId, categoryId, clickCount, orderCount, payCount);
+ // 调用top10Category对象的set方法(假设Top10Category类有此方法用于设置对象的各个属性值),
+ // 将外部传入的任务ID(taskId)以及前面提取出来的品类ID、点击次数、下单次数、支付次数等信息设置到top10Category对象中,
+ // 完成对该对象的属性赋值操作,使其封装好了要持久化到数据库的前10品类的详细数据信息。
+
top10Categories.add(top10Category);
+ // 将封装好数据的top10Category对象添加到top10Categories列表中,通过循环不断添加,最终这个列表将包含所有排名前10品类的完整封装对象,
+ // 准备进行批量插入数据库的操作。
}
- //插入数据库
+
+// 插入数据库
DaoFactory.getTop10CategoryDao().batchInsert(top10Categories);
+// 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getTop10CategoryDao方法,
+// 获取到用于操作Top10Category数据的DAO对象,然后调用其batchInsert方法,
+// 将包含要持久化数据的top10Categories列表传入,实现将排名前10品类的详细信息批量插入到数据库中的功能,完成数据持久化的操作流程。
+
return top10CategoryList;
- }
+// 最后将包含排名前10品类相关信息(以Tuple2类型的键值对形式存在)的top10CategoryList列表返回,
+// 方便外部代码根据需要进一步使用这些数据,比如在其他地方展示、进行关联分析等操作。
@@ -1175,300 +1443,722 @@ public class UserVisitAnalyze {
* @param payCategoryRDD
* @return
*/
- private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) {
- JavaPairRDD>> tmpJoinRDD=categoryRDD.leftOuterJoin(clickCategoryRDD);
+ private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) {
+ // 此方法名为joinCategoryAndData,作用是将不同来源的品类相关数据(品类ID以及对应的点击、下单、支付次数等信息)进行关联整合,
+ // 接收四个JavaPairRDD类型的参数,分别是categoryRDD(包含品类ID信息)、clickCategoryRDD(点击品类次数相关信息)、
+ // orderCategoryRDD(下单品类次数相关信息)以及payCategoryRDD(支付品类次数相关信息),最终返回一个整合好数据的JavaPairRDD类型的结果。
+
+ JavaPairRDD>> tmpJoinRDD = categoryRDD.leftOuterJoin(clickCategoryRDD);
+ // 首先对categoryRDD和clickCategoryRDD进行左外连接(leftOuterJoin)操作。左外连接的特点是,以categoryRDD中的元素为基础,
+ // 对于categoryRDD中的每个品类ID,尝试在clickCategoryRDD中查找对应的点击次数信息,如果能找到,则将品类ID与对应的点击次数信息组合在一起;
+ // 如果在clickCategoryRDD中找不到对应的点击次数(即该品类可能没有点击行为),则点击次数部分会用com.google.common.base.Optional类型表示,
+ // 其中Optional用于处理可能存在或不存在的值的情况(在这里表示可能不存在点击次数的情况)。连接后的结果是一个新的JavaPairRDD,
+ // 其键依然是品类ID(Long类型),值是一个包含两个元素的Tuple2,第一个元素是品类ID(与键相同,重复出现,方便后续处理),
+ // 第二个元素是Optional类型,表示对应的点击次数(可能存在也可能不存在),这个新的RDD被赋值给tmpJoinRDD变量,用于后续进一步处理。
+
+ JavaPairRDD tmpRDD = tmpJoinRDD.mapToPair(new PairFunction>>, Long, String>() {
+ // 调用tmpJoinRDD的mapToPair方法,传入一个实现了PairFunction接口的匿名内部类实例,目的是基于tmpJoinRDD中的元素(类型为Tuple2>>),
+ // 按照自定义的逻辑转换生成新的键值对,新键值对的类型是Tuple2,也就是即将生成的tmpRDD的元素类型,这里主要是将前面连接后的复杂数据结构进行整理,提取关键信息并转换格式。
+
+ @Override
+ public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
+ // 重写了PairFunction接口中的call方法,当执行mapToPair操作时,会针对tmpJoinRDD中的每一个元素(每个Tuple2>>类型的键值对),
+ // 调用这个call方法来执行具体的转换逻辑,参数longTuple2Tuple2就是当前正在处理的那个键值对元素。
+
+ Long categoryId = longTuple2Tuple2._1;
+ // 从传入的键值对longTuple2Tuple2中获取其键部分(也就是品类ID),赋值给categoryId变量,后续将基于这个品类ID来构建最终整合后的数据信息。
+
+ com.google.common.base.Optional clickIOptional = longTuple2Tuple2._2._2;
+ // 从longTuple2Tuple2的值部分(是一个Tuple2>类型)中获取第二个元素,即表示点击次数的Optional类型的对象,
+ // 赋值给clickIOptional变量,用于判断是否存在点击次数以及获取具体的点击次数值(如果存在的话)。
+
+ Long clickCount = 0L;
+ // 初始化一个Long类型的变量clickCount,并赋值为0,用于后续在点击次数不存在(Optional中无值)的情况下作为默认值使用,保证数据结构的一致性。
+
+ if (clickIOptional.isPresent()) {
+ clickCount = clickIOptional.get();
+ }
+ // 通过调用isPresent方法判断clickIOptional中是否存在值(也就是是否有对应的点击次数),如果存在,则通过get方法获取其中存储的点击次数值,
+ // 并赋值给clickCount变量,这样就得到了当前品类的实际点击次数(如果有的话)。
- JavaPairRDD tmpRDD=tmpJoinRDD.mapToPair(new PairFunction>>, Long, String>() {
- @Override
- public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
- Long categoryId=longTuple2Tuple2._1;
- com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2;
- Long clickCount=0L;
- if(clickIOptional.isPresent())
- {
- clickCount=clickIOptional.get();
+ String value = Constants.FIELD_CATEGORY_ID + "=" + categoryId + "|" + Constants.FIELD_CLICK_CATEGORY + "=" + clickCount;
+ // 根据一定的格式规则构建一个字符串value,将品类ID(通过Constants.FIELD_CATEGORY_ID常量标识,应该是在Constants类中定义好的用于表示品类ID字段的相关常量)和实际的点击次数(clickCount)
+ // 以“字段名=值”的形式,并用“|”作为分隔符连接起来,形成一个包含品类ID和点击次数信息的字符串,方便后续继续整合下单、支付次数等信息以及持久化等操作使用。
+
+ return new Tuple2(categoryId, value);
+ // 返回一个新的键值对,键为当前的品类ID(categoryId),值为构建好的包含品类ID和点击次数信息的字符串(value),
+ // 这些新生成的键值对将构成新的JavaPairRDD(即前面定义的tmpRDD),用于后续继续和下单、支付次数等数据进行关联整合操作。
}
+ });
- String value=Constants.FIELD_CATEGORY_ID+"="+categoryId+"|"+Constants.FIELD_CLICK_CATEGORY+"="+clickCount;
- return new Tuple2(categoryId,value);
- }
- });
- //join下单的次数
- tmpRDD=tmpRDD.leftOuterJoin(orderCategoryRDD).mapToPair(new PairFunction>>, Long, String>() {
- @Override
- public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
- Long categoryId=longTuple2Tuple2._1;
- com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2;
- Long clickCount=0L;
- String value=longTuple2Tuple2._2._1;
- if(clickIOptional.isPresent())
- {
- clickCount=clickIOptional.get();
+ // join下单的次数
+ tmpRDD = tmpRDD.leftOuterJoin(orderCategoryRDD).mapToPair(new PairFunction>>, Long, String>() {
+ // 在上一步得到的tmpRDD基础上,先对其与orderCategoryRDD进行左外连接操作,然后再调用mapToPair方法,传入一个实现了PairFunction接口的匿名内部类实例,
+ // 目的同样是对连接后的结果数据进行整理转换,将下单次数信息整合到之前已经包含品类ID和点击次数信息的字符串中,进一步完善每个品类的综合数据信息。
+
+ @Override
+ public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
+ // 重写的call方法,在执行相关操作时针对连接后的数据元素(每个元素是Tuple2>>类型的键值对)进行处理,
+ // 参数longTuple2Tuple2就是当前正在处理的那个键值对元素。
+
+ Long categoryId = longTuple2Tuple2._1;
+ // 从传入的键值对longTuple2Tuple2中获取其键部分,也就是品类ID,赋值给categoryId变量,用于后续构建整合后的数据信息,确保数据始终围绕同一个品类进行处理。
+
+ com.google.common.base.Optional clickIOptional = longTuple2Tuple2._2._2;
+ // 从longTuple2Tuple2的值部分(Tuple2>类型)中获取第二个元素,即表示下单次数的Optional类型的对象,
+ // 赋值给clickIOptional变量,用于判断是否存在下单次数以及后续获取具体的下单次数值(如果存在的话)。
+
+ Long clickCount = 0L;
+ // 初始化一个Long类型的变量clickCount并赋值为0,作为下单次数不存在时的默认值,保证数据结构完整性,方便后续统一处理不同情况的数据。
+
+ String value = longTuple2Tuple2._2._1;
+ // 从longTuple2Tuple2的值部分中获取第一个元素,也就是之前已经整合好的包含品类ID和点击次数信息的字符串,赋值给value变量,
+ // 后续将在这个字符串基础上继续添加下单次数等信息,逐步完善每个品类的完整数据记录。
+
+ if (clickIOptional.isPresent()) {
+ clickCount = clickIOptional.get();
+ }
+ // 通过调用isPresent方法判断clickIOptional中是否存在值(即是否有对应的下单次数),若存在,则使用get方法获取其中存储的下单次数值,
+ // 并赋值给clickCount变量,这样就得到了当前品类的实际下单次数(如果有的话)。
+
+ value = value + "|" + Constants.FIELD_ORDER_CATEGORY + "=" + clickCount;
+ // 将获取到的下单次数信息按照“字段名=值”的格式(通过Constants.FIELD_ORDER_CATEGORY常量标识下单次数字段相关信息),
+ // 并用“|”作为分隔符添加到之前的value字符串后面,使得value字符串进一步整合了下单次数信息,形成包含品类ID、点击次数、下单次数的综合信息字符串。
+
+ return new Tuple2(categoryId, value);
+ // 返回一个新的键值对,键为品类ID(categoryId),值为更新后的包含品类ID、点击次数、下单次数信息的字符串(value),
+ // 这些新的键值对将重新构成tmpRDD(覆盖之前的tmpRDD内容),使其包含更全面的品类相关数据,用于后续继续整合支付次数等信息。
}
+ });
- value=value+"|"+Constants.FIELD_ORDER_CATEGORY+"="+clickCount;
- return new Tuple2(categoryId,value);
- }
- });
- //join支付的次数
- tmpRDD=tmpRDD.leftOuterJoin(payCategoryRDD).mapToPair(new PairFunction>>, Long, String>() {
- @Override
- public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
- Long categoryId=longTuple2Tuple2._1;
- com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2;
- Long clickCount=0L;
- String value=longTuple2Tuple2._2._1;
- if(clickIOptional.isPresent())
- {
- clickCount=clickIOptional.get();
+ // join支付的次数
+ tmpRDD = tmpRDD.leftOuterJoin(payCategoryRDD).mapToPair(new PairFunction>>, Long, String>() {
+ // 继续在已经整合了品类ID、点击次数、下单次数信息的tmpRDD基础上,与payCategoryRDD进行左外连接操作,然后调用mapToPair方法,
+ // 通过传入的匿名内部类实例定义的逻辑,将支付次数信息也整合到现有的每个品类的数据字符串中,最终形成完整的包含品类各种行为次数信息的综合数据结构。
+
+ @Override
+ public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception {
+ // 重写的call方法,用于处理当前连接后的数据元素(每个元素是Tuple2>>类型的键值对),
+ // 参数longTuple2Tuple2就是正在处理的那个键值对元素。
+
+ Long categoryId = longTuple2Tuple2._1;
+ // 从传入的键值对longTuple2Tuple2中获取其键部分,也就是品类ID,赋值给categoryId变量,确保后续所有操作都是围绕这个品类ID对应的信息进行整合完善。
+
+ com.google.common.base.Optional clickIOptional = longTuple2Tuple2._2._2;
+ // 从longTuple2Tuple2的值部分(Tuple2>类型)中获取第二个元素,即表示支付次数的Optional类型的对象,
+ // 赋值给clickIOptional变量,用于判断是否存在支付次数以及后续获取具体的支付次数值(如果存在的话)。
+
+ Long clickCount = 0L;
+ // 初始化一个Long类型的变量clickCount并赋值为0,作为支付次数不存在时的默认值,保证数据结构统一,便于统一处理不同情况的数据记录。
+
+ String value = longTuple2Tuple2._2._1;
+ // 从longTuple2Tuple2的值部分获取第一个元素,也就是已经整合了品类ID、点击次数、下单次数信息的字符串,赋值给value变量,
+ // 后续将在此字符串基础上添加支付次数信息,形成最终完整的品类综合数据字符串。
+
+ if (clickIOptional.isPresent()) {
+ clickCount = clickIOptional.get();
+ }
+ // 通过调用isPresent方法判断clickIOptional中是否存在值(即是否有对应的支付次数),若存在,则使用get方法获取其中存储的支付次数值,
+ // 并赋值给clickCount变量,这样就得到了当前品类的实际支付次数(如果有的话)。
+
+ value = value + "|" + Constants.FIELD_PAY_CATEGORY + "=" + clickCount;
+ // 按照“字段名=值”的格式(通过Constants.FIELD_PAY_CATEGORY常量标识支付次数字段相关信息),
+ // 以“|”作为分隔符将支付次数信息添加到value字符串后面,使得value字符串最终整合了品类ID、点击次数、下单次数、支付次数等所有相关信息,形成完整的综合数据记录字符串。
+
+ return new Tuple2(categoryId, value);
+ // 返回一个新的键值对,键为品类ID(categoryId),值为包含了品类ID以及点击、下单、支付次数等完整信息的字符串(value),
+ // 这些新生成的键值对构成更新后的tmpRDD,现在tmpRDD中每个元素的字符串值包含了该品类的全面行为次数统计信息,准备返回给调用者使用。
}
+ });
+
+ return tmpRDD;
+ // 将经过多次连接和数据整合操作后得到的tmpRDD返回,这个RDD的元素是键值对,键为品类ID,值为一个包含了该品类的点击、下单、支付次数等详细统计信息的字符串,
+ // 方便外部代码基于这个整合好的数据进行后续操作,比如排序、取前N条数据、持久化到数据库等处理。
+ }
+
+
- value=value+"|"+Constants.FIELD_PAY_CATEGORY+"="+clickCount;
- return new Tuple2(categoryId,value);
- }
- });
- return tmpRDD;
- }
//后去支付品类RDD
- private static JavaPairRDD getPayCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
- JavaPairRDD payActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() {
- @Override
- public Boolean call(Tuple2 stringRowTuple2) throws Exception {
- Row row=stringRowTuple2._2;
- String categoryIds=row.getString(10);
- if(categoryIds==null||"".equals(categoryIds)) return false;
- return true;
- }
- });
- //映射成为新的Pair
- JavaPairRDD payCategoryRDD=payActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
- @Override
- public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
- List> orderCategoryIds=new ArrayList>();
- Row row=stringRowTuple2._2;
- String payCategoryIdsSplited[]=row.getString(10).split(",");
- for (String payCategoryId:
- payCategoryIdsSplited) {
- orderCategoryIds.add(new Tuple2(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId)));
+ private static JavaPairRDD getPayCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
+ // 此方法名为getPayCategoryRDD,其目的是从给定的JavaPairRDD类型的sessionId2DetailRDD中提取出与支付品类相关的信息,
+ // 经过一系列处理后,最终返回一个JavaPairRDD类型的结果,其中键和值大概率都与支付品类相关(比如品类ID以及对应的支付次数等,从后续代码逻辑推测)。
+
+ JavaPairRDD payActionRDD = sessionId2DetailRDD.filter(new Function, Boolean>() {
+ // 调用sessionId2DetailRDD的filter方法,传入一个实现了Function接口的匿名内部类实例,
+ // 这个filter操作的作用是对sessionId2DetailRDD中的元素(每个元素是Tuple2类型的键值对)进行筛选,
+ // 只保留满足特定条件的元素,生成一个新的JavaPairRDD类型的payActionRDD,用于后续进一步提取支付品类相关数据。
+
+ @Override
+ public Boolean call(Tuple2 stringRowTuple2) throws Exception {
+ // 重写了Function接口中的call方法,当执行filter操作时,会针对sessionId2DetailRDD中的每一个元素(即每个Tuple2类型的键值对)调用这个call方法,
+ // 参数stringRowTuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
+
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(也就是Row类型的数据),赋值给row变量,
+ // Row类型通常代表一行包含多个字段的数据,后续将从这个row中提取出与支付品类相关的字段信息。
+
+ String categoryIds = row.getString(10);
+ // 从row数据行中获取索引为10的字段(从代码上下文推测这个字段存储的是支付品类的ID集合,以字符串形式存在,不过具体要结合数据结构定义确定),
+ // 并将其赋值给categoryIds变量,后续将基于这个变量来判断是否存在支付品类相关信息。
+
+ if (categoryIds == null || "".equals(categoryIds)) return false;
+ // 判断categoryIds是否为空(即不存在支付品类ID集合)或者是否为空字符串(可能表示没有实际有效的支付品类ID),
+ // 如果满足这两种情况之一,则返回false,表示当前这个键值对对应的元素不符合筛选条件,将会被过滤掉;
+ // 只有当categoryIds不为空且不是空字符串时,才表示存在支付品类相关信息,该元素符合筛选条件,会保留在后续生成的payActionRDD中。
+
+ return true;
+ // 返回true,表示当前处理的这个键值对对应的元素满足筛选条件,应该保留在经过筛选后的payActionRDD中,用于后续进一步处理。
}
- return orderCategoryIds;
- }
- });
+ });
+
+ // 映射成为新的Pair
+ JavaPairRDD payCategoryRDD = payActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ // 调用payActionRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
+ // 此操作的目的是将payActionRDD中的元素(类型为Tuple2)按照自定义的逻辑转换为零个、一个或多个新的键值对(输出类型为Tuple2),
+ // 以便将支付品类相关信息整理成更便于后续处理(比如计算支付品类次数等)的键值对形式。
+
+ @Override
+ public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
+ // 重写了PairFlatMapFunction接口中的call方法,当执行flatMapToPair操作时,会针对payActionRDD中的每一个元素(每个Tuple2类型的键值对)调用这个call方法,
+ // 参数stringRowTuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
+
+ List> orderCategoryIds = new ArrayList>();
+ // 创建一个名为orderCategoryIds的ArrayList列表,用于存放即将生成的与支付品类相关的键值对信息,
+ // 其键和值的类型都为Long,后续会根据从数据中提取出的支付品类ID来构建并添加这些键值对到列表中。
+
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(即Row类型的数据),赋值给row变量,再次获取数据行对象,方便后续提取支付品类ID信息。
+
+ String payCategoryIdsSplited[] = row.getString(10).split(",");
+ // 从row数据行中获取索引为10的字段(前面已提到该字段存储支付品类的ID集合,以逗号分隔的字符串形式存在),
+ // 通过split方法以逗号作为分隔符将其拆分成一个字符串数组payCategoryIdsSplited,数组中的每个元素就是一个支付品类的ID(以字符串形式呈现)。
+
+ for (String payCategoryId :
+ payCategoryIdsSplited) {
+ orderCategoryIds.add(new Tuple2(Long.valueOf(payCategoryId), Long.valueOf(payCategoryId)));
+ // 遍历拆分后的支付品类ID字符串数组payCategoryIdsSplited,对于其中的每个字符串形式的支付品类ID(payCategoryId),
+ // 先将其转换为Long类型,然后创建一个键值对,键和值都设置为这个转换后的Long类型的支付品类ID,
+ // 最后将这个键值对添加到orderCategoryIds列表中,这样就将每个支付品类ID整理成了方便后续统计等操作的键值对形式。
+ }
+ return orderCategoryIds;
+ // 返回包含了所有支付品类相关键值对信息的orderCategoryIds列表,这些键值对将会构成新的JavaPairRDD(即前面定义的payCategoryRDD)中的元素,
+ // 用于后续计算支付品类的次数等操作。
+ }
+ });
+
+ // 计算次数
+ JavaPairRDD payCategoryCountRDD = payCategoryRDD.reduceByKey(new Function2() {
+ // 调用payCategoryRDD的reduceByKey方法,传入一个实现了Function2接口的匿名内部类实例,
+ // 这个操作的目的是对payCategoryRDD中具有相同键(这里推测键是支付品类ID)的元素的值(同样推测是与支付品类相关的某个计数信息,比如出现次数等)进行聚合操作,
+ // 最终生成一个新的JavaPairRDD类型的payCategoryCountRDD,其值就是经过聚合计算后的每个支付品类对应的次数等统计信息。
+
+ @Override
+ public Long call(Long aLong, Long aLong2) throws Exception {
+ // 重写了Function2接口中的call方法,当执行reduceByKey操作时,对于payCategoryRDD中具有相同键的每一组元素,
+ // 会调用这个call方法来执行具体的聚合逻辑,参数aLong和aLong2就是同一组中不同元素的值部分(都是Long类型,根据前面推测可能是计数相关的值)。
+
+ return aLong + aLong2;
+ // 在这里定义的聚合逻辑很简单,就是将同一组中不同元素的值相加,也就是将同一个支付品类ID对应的多个计数信息进行累加,
+ // 得到这个支付品类总的计数(比如总的支付次数等,结合业务逻辑推测),返回的结果就是聚合后该支付品类对应的最终次数信息。
+ }
+ });
+
+ return payCategoryCountRDD;
+ // 将经过前面一系列处理,最终得到的包含支付品类ID以及对应的支付次数信息(通过聚合计算得出)的JavaPairRDD类型的payCategoryCountRDD返回,
+ // 方便外部代码根据需要进一步使用这些数据,例如和其他品类相关数据(如点击品类、下单品类的数据)进行整合分析等操作。
+ }
- //计算次数
- JavaPairRDD payCategoryCountRDD=payCategoryRDD.reduceByKey(new Function2() {
- @Override
- public Long call(Long aLong, Long aLong2) throws Exception {
- return aLong+aLong2;
- }
- });
- return payCategoryCountRDD;
- }
//获取下单品类RDD
- private static JavaPairRDD getOrderCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
- JavaPairRDD orderActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() {
- @Override
- public Boolean call(Tuple2 stringRowTuple2) throws Exception {
- Row row=stringRowTuple2._2;
- String categoryIds=row.getString(8);
- if(categoryIds==null||"".equals(categoryIds)) return false;
- return true;
- }
- });
- //映射成为新的Pair
- JavaPairRDD orderCategoryRDD=orderActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
- @Override
- public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
- List> orderCategoryIds=new ArrayList>();
- Row row=stringRowTuple2._2;
- String orderCategoryIdsSplited[]=row.getString(8).split(",");
- for (String orderCategoryId:
- orderCategoryIdsSplited) {
- orderCategoryIds.add(new Tuple2(Long.valueOf(orderCategoryId),Long.valueOf(orderCategoryId)));
+ private static JavaPairRDD getOrderCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
+ // 此方法名为getOrderCategoryRDD,其功能是从给定的JavaPairRDD类型的sessionId2DetailRDD中提取出与下单品类相关的信息,
+ // 通过一系列的数据处理操作,最终返回一个JavaPairRDD类型的结果,这个结果大概率包含了下单品类ID以及对应的下单次数等相关数据(从代码逻辑推测)。
+
+ JavaPairRDD orderActionRDD = sessionId2DetailRDD.filter(new Function, Boolean>() {
+ // 调用sessionId2DetailRDD的filter方法,传入一个实现了Function接口的匿名内部类实例,
+ // 该filter操作的目的是对sessionId2DetailRDD里的元素(每个元素是Tuple2类型的键值对)进行筛选,
+ // 只保留符合特定条件的元素,生成新的JavaPairRDD类型的orderActionRDD,为后续提取下单品类相关数据做准备。
+
+ @Override
+ public Boolean call(Tuple2 stringRowTuple2) throws Exception {
+ // 重写了Function接口中的call方法,当执行filter操作时,会针对sessionId2DetailRDD中的每一个元素(即每个Tuple2类型的键值对)调用这个call方法,
+ // 参数stringRowTuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
+
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(也就是Row类型的数据),赋值给row变量,
+ // Row类型通常代表一行包含多个字段的数据,后续会从这个row中提取出与下单品类相关的字段信息。
+
+ String categoryIds = row.getString(8);
+ // 从row数据行中获取索引为8的字段(从代码上下文推测该字段存储的是下单品类的ID集合,以字符串形式存在,不过具体要结合数据结构定义确定),
+ // 并将其赋值给categoryIds变量,后续会基于这个变量来判断是否存在下单品类相关信息。
+
+ if (categoryIds == null || "".equals(categoryIds)) return false;
+ // 判断categoryIds是否为空(即不存在下单品类ID集合)或者是否为空字符串(意味着没有实际有效的下单品类ID),
+ // 如果满足这两种情况之一,则返回false,表示当前这个键值对对应的元素不符合筛选条件,会被过滤掉;
+ // 只有当categoryIds不为空且不是空字符串时,才说明存在下单品类相关信息,该元素符合筛选条件,可保留在后续生成的orderActionRDD中。
+
+ return true;
+ // 返回true,表示当前处理的这个键值对对应的元素满足筛选条件,应当保留在经过筛选后的orderActionRDD里,用于后续进一步的数据处理。
}
- return orderCategoryIds;
- }
- });
+ });
- //计算次数
- JavaPairRDD orderCategoryCountRDD=orderCategoryRDD.reduceByKey(new Function2() {
- @Override
- public Long call(Long aLong, Long aLong2) throws Exception {
- return aLong+aLong2;
- }
- });
+ // 映射成为新的Pair
+ JavaPairRDD orderCategoryRDD = orderActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ // 调用orderActionRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
+ // 其作用是把orderActionRDD中的元素(类型为Tuple2)按照自定义的逻辑转换为零个、一个或多个新的键值对(输出类型是Tuple2),
+ // 以便将下单品类相关信息整理成更便于后续处理(比如计算下单品类次数等)的键值对形式。
- return orderCategoryCountRDD;
- }
+ @Override
+ public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
+ // 重写了PairFlatMapFunction接口中的call方法,当执行flatMapToPair操作时,会针对orderActionRDD中的每一个元素(每个Tuple2类型的键值对)调用这个call方法,
+ // 参数stringRowTuple2就是正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
- //获取点击品类RDD
- private static JavaPairRDD getLClickCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
- JavaPairRDD clickActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() {
- @Override
- public Boolean call(Tuple2 stringRowTuple2) throws Exception {
- Row row=stringRowTuple2._2;
- if(row.get(6)==null) return false;
- return true;
- }
- });
- //映射成为新的Pair
- JavaPairRDD clickCategoryRDD=clickActionRDD.mapToPair(new PairFunction, Long,Long>() {
- @Override
- public Tuple2 call(Tuple2 stringRowTuple2) throws Exception {
- Long row=stringRowTuple2._2.getLong(6);
- return new Tuple2(row,1L);
- }
- });
+ List> orderCategoryIds = new ArrayList>();
+ // 创建一个名为orderCategoryIds的ArrayList列表,用于存放即将生成的与下单品类相关的键值对信息,
+ // 其键和值的类型都为Long,后续会根据从数据中提取出的下单品类ID来构建并添加这些键值对到列表中。
- //计算次数
- JavaPairRDD clickCategoryCountRDD=clickCategoryRDD.reduceByKey(new Function2() {
- @Override
- public Long call(Long aLong, Long aLong2) throws Exception {
- return aLong+aLong2;
- }
- });
- return clickCategoryCountRDD;
- }
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(也就是Row类型的数据),赋值给row变量,再次获取数据行对象,方便后续提取下单品类ID信息。
+ String orderCategoryIdsSplited[] = row.getString(8).split(",");
+ // 从row数据行中获取索引为8的字段(前面提到该字段存储下单品类的ID集合,以逗号分隔的字符串形式存在),
+ // 通过split方法以逗号作为分隔符将其拆分成一个字符串数组orderCategoryIdsSplited,数组中的每个元素就是一个下单品类的ID(以字符串形式呈现)。
- //获取每一个品类的Session Top10
- private static void getTop10Session(JavaSparkContext sc, final Long taskId, JavaPairRDD sessionInfoPairRDD, List> top10CategoryIds) {
- List> categoryIdList=new ArrayList>();
- for(Tuple2 top10CategoryId:top10CategoryIds)
- {
- String countInfo=top10CategoryId._2;
- Long categoryId=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CATEGORY_ID));
- categoryIdList.add(new Tuple2(categoryId,categoryId));
- }
- //生成一份RDD
- JavaPairRDD