|
|
|
@ -1958,21 +1958,18 @@ public class UserVisitAnalyze {
|
|
|
|
|
|
|
|
|
|
return new Tuple2<Long, String>(tuple2._1, tuple2._2._2);
|
|
|
|
|
// 从传入的键值对tuple2中提取其键部分(也就是品类ID,tuple2._1)以及值部分中包含点击次数信息的字符串(tuple2._2._2),
|
|
|
|
|
// 创建一个新的键值对,键为品类ID,值为包含点击次数信息的字符串,这样就得到了每一个热门品类在各个会话中的点击次数相关信息的键值对形式,
|
|
|
|
|
// 这些新的键值对将构成新的JavaPairRDD<Long, String>(即前面定义的top10CategorySessionCountRDD),用于后续基于热门品类的分组等操作。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 根据品类分组
|
|
|
|
|
JavaPairRDD<Long, Iterable<String>> top10CategorySessionCountGroupRDD = top10CategorySessionCountRDD.groupByKey();
|
|
|
|
|
// 调用top10CategorySessionCountRDD的groupByKey方法,对其按照键(也就是品类ID)进行分组操作,
|
|
|
|
|
// 分组后的结果是一个新的JavaPairRDD<Long, Iterable<String>>类型的top10CategorySessionCountGroupRDD,其键为品类ID,值是一个可迭代的字符串集合,
|
|
|
|
|
// 每个集合中的字符串包含了对应品类在不同会话中的点击次数等相关信息,方便后续针对每个品类汇总分析其在各个会话中的情况,例如找出每个品类对应的点击次数较多的会话等操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
JavaPairRDD<String, String> top10CategorySessionRDD = top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, Iterable<String>>, String, String>() {
|
|
|
|
|
// 调用top10CategorySessionCountGroupRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
|
|
|
|
|
// 此操作的目的是将分组后的top10CategorySessionCountGroupRDD中的元素(类型为Tuple2<Long, Iterable<String>>)按照自定义的逻辑转换为零个、一个或多个新的键值对(输出类型是Tuple2<String, String>),
|
|
|
|
|
// 在这里主要是从每个品类对应的会话点击次数信息中提取出排名前10的会话信息,并整理成相应的键值对形式,便于后续持久化到数据库等操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Iterable<Tuple2<String, String>> call(Tuple2<Long, Iterable<String>> tuple2) throws Exception {
|
|
|
|
@ -1994,20 +1991,18 @@ public class UserVisitAnalyze {
|
|
|
|
|
|
|
|
|
|
List<Tuple2<String, String>> sessionIdList = new ArrayList<Tuple2<String, String>>();
|
|
|
|
|
// 创建一个名为sessionIdList的ArrayList列表,用于存放即将生成的包含会话ID的键值对,其键和值都为会话ID(String类型),
|
|
|
|
|
// 后续会将排名前10的会话ID信息整理成这样的键值对添加到这个列表中,用于返回给外部或者进行其他相关操作(比如和其他数据关联等)。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (String sessionCount : tuple2._2) {
|
|
|
|
|
// 开始遍历tuple2的值部分(是一个可迭代的字符串集合,每个字符串包含了对应品类在某个会话中的点击次数等相关信息),通过遍历可以依次获取每个会话的相关信息,
|
|
|
|
|
// 用于判断是否能进入排名前10的会话列表以及更新相应的排名情况。
|
|
|
|
|
|
|
|
|
|
String[] sessionCountSplited = sessionCount.split(",");
|
|
|
|
|
// 将当前遍历到的包含会话信息的字符串(sessionCount)按照逗号作为分隔符进行拆分,得到一个字符串数组sessionCountSplited,
|
|
|
|
|
// 数组中第一个元素(索引为0)应该是会话ID,第二个元素(索引为1)应该是点击次数(从前面构建字符串的逻辑推测),方便后续分别提取使用。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// String sessionId = sessionCountSplited[0];
|
|
|
|
|
Long count = Long.valueOf(sessionCountSplited[1]);
|
|
|
|
|
// 从拆分后的字符串数组sessionCountSplited中获取第二个元素(索引为1),也就是点击次数信息,并将其转换为Long类型,赋值给count变量,
|
|
|
|
|
// 用于后续和已有的排名前10的会话信息进行比较,判断是否能进入前10的列表。
|
|
|
|
|
|
|
|
|
|
// 获取TopN
|
|
|
|
|
for (int i = 0; i < top10Sessions.length; i++) {
|
|
|
|
@ -2024,10 +2019,7 @@ public class UserVisitAnalyze {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// 上述循环用于将当前会话的点击次数信息(sessionCount)与已有的top10Sessions数组中的会话信息进行比较,
|
|
|
|
|
// 如果top10Sessions数组中某个位置为空(即还未满10个会话信息),则直接将当前会话信息放入该位置;
|
|
|
|
|
// 如果该位置已有会话信息,则比较它们的点击次数(通过解析字符串获取),如果当前会话的点击次数大于已有位置的点击次数,
|
|
|
|
|
// 则将已有位置及之后的会话信息依次往后移动一位(腾出当前位置),然后将当前会话信息放入该位置,以此来动态维护每个品类的前10个点击次数最多的会话信息列表。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 封装数据
|
|
|
|
@ -2041,33 +2033,19 @@ public class UserVisitAnalyze {
|
|
|
|
|
sessionIdList.add(new Tuple2<String, String>(sessionId, sessionId));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// 对于top10Sessions数组中不为空的每个位置(也就是每个品类对应的排名前10的会话信息),进行如下操作:
|
|
|
|
|
// 首先创建一个Top10CategorySession类型的对象top10CategorySession,然后从对应的会话信息字符串(top10Sessions[i])中解析出会话ID(通过split方法获取第一个元素)
|
|
|
|
|
// 和点击次数(通过split方法获取第二个元素并转换为Long类型),接着调用top10CategorySession对象的set方法(假设该类有这样的方法用于设置属性值),
|
|
|
|
|
// 将外部传入的任务ID(taskId)、当前品类ID(categoryId)、解析出的会话ID(sessionId)以及点击次数(count)设置到对象中,
|
|
|
|
|
// 将封装好数据的top10CategorySession对象添加到top10CategorySessionList列表中,同时将会话ID封装成键值对(键和值都为会话ID)添加到sessionIdList列表中,
|
|
|
|
|
// 这样top10CategorySessionList就包含了所有需要持久化到数据库的排名前10的品类会话详细信息对象,而sessionIdList则包含了对应的会话ID信息键值对。
|
|
|
|
|
|
|
|
|
|
// 批量插入数据库
|
|
|
|
|
DaoFactory.getTop10CategorySessionDao().batchInsert(top10CategorySessionList);
|
|
|
|
|
// 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getTop10CategorySessionDao方法,
|
|
|
|
|
// 获取到用于操作Top10CategorySession数据的DAO对象,然后调用其batchInsert方法,
|
|
|
|
|
// 将包含要持久化数据的top10CategorySessionList列表传入,实现将排名前10的品类会话详细信息批量插入到数据库中的功能,完成数据持久化的操作流程。
|
|
|
|
|
|
|
|
|
|
return sessionIdList;
|
|
|
|
|
// 返回包含会话ID信息键值对的sessionIdList列表,虽然这里代码逻辑上返回了这个列表,但从整体功能来看,重点在于前面已经完成了将排名前10的品类会话详细信息插入数据库的操作,
|
|
|
|
|
// 这个返回值可能在后续代码(如果有的话)中用于其他关联操作或者只是作为一种函数执行结果的返回形式存在,具体要结合整体的代码上下文来进一步确定其用途。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//3. 获取session的明细数据保存到数据库
|
|
|
|
|
JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD = top10CategorySessionRDD.join(sessionInfoPairRDD);
|
|
|
|
|
// 对top10CategorySessionRDD(前面经过一系列处理得到的与排名前10的品类会话相关的RDD,其键值对类型为<String, String>,从前面代码逻辑推测键可能是会话相关信息,值是对应信息字符串)
|
|
|
|
|
// 和sessionInfoPairRDD(包含完整会话信息的RDD,键值对类型为<String, Row>,键是会话ID,值是包含多个字段的Row类型的会话详细数据行)进行连接(join)操作。
|
|
|
|
|
// 连接的目的是将排名前10的品类会话相关信息与完整的会话详细数据进行整合,生成一个新的JavaPairRDD<String, Tuple2<String, Row>>类型的sessionDetailRDD,
|
|
|
|
|
// 其键为某个用于关联的标识(从前面逻辑推测可能与会话相关),值是一个包含两个元素的Tuple2,第一个元素是字符串(可能是前面top10品类会话相关的部分信息),第二个元素是Row类型的完整会话详细数据,
|
|
|
|
|
// 方便后续基于整合后的数据提取出需要持久化到数据库的详细会话信息。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
|
|
|
|
|
// 调用sessionDetailRDD的foreachPartition方法,传入一个实现了VoidFunction接口的匿名内部类实例,
|
|
|
|
@ -2077,87 +2055,80 @@ public class UserVisitAnalyze {
|
|
|
|
|
@Override
|
|
|
|
|
public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> tuple2Iterator) throws Exception {
|
|
|
|
|
// 重写了VoidFunction接口中的call方法,当foreachPartition方法遍历每个分区时,会针对每个分区对应的迭代器(Iterator<Tuple2<String, Tuple2<String, Row>>>类型,其中元素是包含会话相关信息的复杂键值对)调用这个call方法,
|
|
|
|
|
// 参数tuple2Iterator就是当前正在处理的那个分区对应的迭代器,通过它可以遍历该分区内的所有元素(每个元素是Tuple2<String, Tuple2<String, Row>>类型的键值对)进行具体的数据提取和处理操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
List<SessionDetail> sessionDetailList = new ArrayList<SessionDetail>();
|
|
|
|
|
// 创建一个名为sessionDetailList的ArrayList列表,用于存放SessionDetail类型的对象,
|
|
|
|
|
// SessionDetail应该是自定义的用于封装要持久化到数据库的详细会话信息的实体类,后续会将从分区数据中提取出来的各个会话的详细信息封装成此类对象添加到这个列表中,
|
|
|
|
|
// 以便进行批量插入数据库的操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while (tuple2Iterator.hasNext()) {
|
|
|
|
|
// 通过调用迭代器(tuple2Iterator)的hasNext方法判断当前分区内是否还有未处理的元素(即Tuple2<String, Tuple2<String, Row>>类型的键值对),如果有则进入循环进行处理。
|
|
|
|
|
|
|
|
|
|
Tuple2<String, Tuple2<String, Row>> tuple2 = tuple2Iterator.next();
|
|
|
|
|
// 调用迭代器(tuple2Iterator)的next方法获取下一个要处理的元素(键值对),并赋值给tuple2变量,
|
|
|
|
|
// tuple2的类型为Tuple2<String, Tuple2<String, Row>>,其第一个元素(tuple2._1)可能是会话相关的标识信息,第二个元素(tuple2._2)是一个包含字符串和Row类型数据的Tuple2,
|
|
|
|
|
// 后续将从这个复杂的结构中提取出具体的会话详细字段信息进行封装。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Row row = tuple2._2._2;
|
|
|
|
|
// 从tuple2的值部分(即Tuple2<String, Row>类型的元素)中获取第二个元素(也就是Row类型的数据),赋值给row变量,
|
|
|
|
|
// Row类型通常代表一行包含多个字段的会话详细数据,后续将从这个row中提取出各个具体的字段信息,用于构建SessionDetail对象。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String sessionId = tuple2._1;
|
|
|
|
|
// 从tuple2中获取其键部分(也就是前面提到的可能与会话相关的标识信息),赋值给sessionId变量,这里将其作为会话的ID信息,
|
|
|
|
|
// 后续会把这个会话ID设置到SessionDetail对象中,确保数据的完整性和关联性。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Long userId = row.getLong(1);
|
|
|
|
|
// 从row数据行中获取索引为1的字段(从代码上下文推测该字段存储的是用户ID信息,不过具体要结合数据结构定义确定),并将其转换为Long类型,赋值给userId变量,
|
|
|
|
|
// 用于后续将用户ID信息封装到SessionDetail对象中,记录该会话对应的用户情况。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Long pageId = row.getLong(3);
|
|
|
|
|
// 从row数据行中获取索引为3的字段(推测该字段存储的是页面ID等相关信息,具体依据数据结构定义),转换为Long类型后赋值给pageId变量,
|
|
|
|
|
// 以便将页面相关信息添加到SessionDetail对象中,完善会话详细信息的记录。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String actionTime = row.getString(4);
|
|
|
|
|
// 从row数据行中获取索引为4的字段(可能是会话操作时间等相关信息,依据数据结构而定),赋值给actionTime变量,
|
|
|
|
|
// 后续会把这个时间信息设置到SessionDetail对象中,用于记录会话发生的时间情况。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String searchKeyWard = row.getString(5);
|
|
|
|
|
// 从row数据行中获取索引为5的字段(可能是搜索关键词等相关信息,根据实际数据结构来确定),赋值给searchKeyWard变量,
|
|
|
|
|
// 用于将搜索相关情况添加到SessionDetail对象中,更全面地记录会话的详细内容。
|
|
|
|
|
|
|
|
|
|
Long clickCategoryId = row.getLong(6);
|
|
|
|
|
// 从row数据行中获取索引为6的字段(推测是点击品类的ID信息,结合前面代码逻辑判断),转换为Long类型后赋值给clickCategoryId变量,
|
|
|
|
|
// 这样就能把点击品类相关信息纳入到SessionDetail对象中,记录会话中涉及的品类点击情况。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Long clickProducetId = row.getLong(7);
|
|
|
|
|
// 从row数据行中获取索引为7的字段(可能是点击产品的ID信息,根据数据结构定义),赋值给clickProducetId变量,
|
|
|
|
|
// 以便在SessionDetail对象中记录点击产品的相关情况,丰富会话详细信息的内容。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String orderCategoryId = row.getString(8);
|
|
|
|
|
// 从row数据行中获取索引为8的字段(推测是下单品类的ID相关信息,从整体业务逻辑推测),赋值给orderCategoryId变量,
|
|
|
|
|
// 后续会将下单品类信息添加到SessionDetail对象中,用于记录会话中涉及的下单品类情况。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String orderProducetId = row.getString(9);
|
|
|
|
|
// 从row数据行中获取索引为9的字段(可能是下单产品的ID相关信息,依据数据结构确定),赋值给orderProducetId变量,
|
|
|
|
|
// 以便把下单产品相关情况设置到SessionDetail对象中,更完整地记录会话中下单相关的详细信息。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String payCategoryId = row.getString(10);
|
|
|
|
|
// 从row数据行中获取索引为10的字段(推测是支付品类的ID相关信息,结合业务逻辑判断),赋值给payCategoryId变量,
|
|
|
|
|
// 用于将支付品类信息纳入到SessionDetail对象中,记录会话中涉及的支付品类情况。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String payProducetId = row.getString(11);
|
|
|
|
|
// 从row数据行中获取索引为11的字段(可能是支付产品的ID相关信息,根据数据结构定义),赋值给payProducetId变量,
|
|
|
|
|
// 这样就能在SessionDetail对象中记录支付产品的相关情况,完善会话详细信息的记录,使其包含会话涉及的各种业务行为相关的详细信息。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SessionDetail sessionDetail = new SessionDetail();
|
|
|
|
|
// 创建一个SessionDetail类型的对象sessionDetail,此类是用于封装完整会话详细信息的自定义实体类,接下来将把前面提取到的各个字段信息设置到这个对象中。
|
|
|
|
|
|
|
|
|
|
sessionDetail.set(taskId, userId, sessionId, pageId, actionTime, searchKeyWard, clickCategoryId, clickProducetId, orderCategoryId, orderProducetId, payCategoryId, payProducetId);
|
|
|
|
|
// 调用sessionDetail对象的set方法(假设SessionDetail类有此方法用于设置对象的各个属性值),
|
|
|
|
|
// 将外部传入的任务ID(taskId)以及前面提取出来的用户ID、会话ID、页面ID、操作时间、搜索关键词、点击品类ID、点击产品ID、下单品类ID、下单产品ID、支付品类ID、支付产品ID等信息设置到sessionDetail对象中,
|
|
|
|
|
// 完成对该对象的属性赋值操作,使其封装好了当前会话的完整详细信息,准备添加到列表中进行批量插入数据库操作。
|
|
|
|
|
|
|
|
|
|
sessionDetailList.add(sessionDetail);
|
|
|
|
|
// 将封装好数据的sessionDetail对象添加到sessionDetailList列表中,通过循环不断添加,最终这个列表将包含当前分区内所有会话的完整详细信息对象,
|
|
|
|
|
// 待整个分区的数据都处理完后,就可以进行批量插入数据库的操作了。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
|
|
|
|
|
// 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getSessionDetailDao方法,
|
|
|
|
|
// 获取到用于操作SessionDetail数据的DAO对象,然后调用其batchInsert方法,
|
|
|
|
|
// 将包含要持久化数据的sessionDetailList列表传入,实现将当前分区内所有会话的详细信息批量插入到数据库中的功能,
|
|
|
|
|
// 完成对每个分区数据的持久化操作,整个流程遍历完所有分区后,就将所有相关会话的详细信息都保存到数据库中了。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|