|
|
|
@ -1855,25 +1855,17 @@ public class UserVisitAnalyze {
|
|
|
|
|
// 并赋值给`countInfo`变量,后续将从这个字符串里按照特定的分隔规则提取出品类ID相关内容。
|
|
|
|
|
|
|
|
|
|
Long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
|
|
|
|
|
// 借助`StringUtils`工具类(应该是自定义的用于字符串处理的工具类)的`getFieldFromConcatString`方法,
|
|
|
|
|
// 按照“|”作为分隔符,从`countInfo`字符串中提取出对应`Constants.FIELD_CATEGORY_ID`(推测是在`Constants`类中定义好的常量,用于明确品类ID在字符串中的位置等信息)
|
|
|
|
|
// 这个字段对应的内容,然后将其转换为`Long`类型,赋值给`categoryId`变量,这样就获取到了当前品类的ID信息。
|
|
|
|
|
|
|
|
|
|
categoryIdList.add(new Tuple2<Long, Long>(categoryId, categoryId));
|
|
|
|
|
// 将获取到的品类ID信息封装成一个键值对(这里键和值都设置为该品类ID),然后添加到`categoryIdList`列表中,
|
|
|
|
|
// 这么做可能是为了后续方便统一数据格式,便于生成RDD以及基于品类ID与其他数据做关联操作等。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 生成一份RDD
|
|
|
|
|
JavaPairRDD<Long, Long> top10CategoryIdsRDD = sc.parallelizePairs(categoryIdList);
|
|
|
|
|
// 使用`JavaSparkContext`(`sc`)的`parallelizePairs`方法,把包含品类ID信息的`categoryIdList`列表转换为一个`JavaPairRDD<Long, Long>`类型的RDD(即`top10CategoryIdsRDD`),
|
|
|
|
|
// 该RDD以键值对形式存储了排名前10的品类的ID信息(键和值都是品类ID),方便后续在Spark环境下基于品类ID与其他RDD进行各种操作,例如进行连接(`join`)等操作。
|
|
|
|
|
|
|
|
|
|
// 按照SessionId进行分组
|
|
|
|
|
JavaPairRDD<String, Iterable<Row>> gourpBySessionIdRDD = sessionInfoPairRDD.groupByKey();
|
|
|
|
|
// 调用`sessionInfoPairRDD`的`groupByKey`方法,对这个RDD按照其键(也就是`SessionId`)进行分组操作,
|
|
|
|
|
// 分组后的结果是一个新的`JavaPairRDD<String, Iterable<Row>>`类型的`gourpBySessionIdRDD`,其键为`SessionId`,值是一个可迭代的`Row`类型的集合,
|
|
|
|
|
// 这意味着每个`SessionId`对应的所有会话详细信息(以`Row`类型表示,通常每行数据包含多个字段)会被放在一起,方便后续针对每个会话做进一步的相关统计分析等操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 计算每一个session对品类的点击次数
|
|
|
|
|
JavaPairRDD<Long, String> categorySessionCountRDD = gourpBySessionIdRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
|
|
|
|
@ -1883,29 +1875,17 @@ public class UserVisitAnalyze {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Iterable<Tuple2<Long, String>> call(Tuple2<String, Iterable<Row>> tuple2) throws Exception {
|
|
|
|
|
// 重写了`PairFlatMapFunction`接口中的`call`方法,当执行`flatMapToPair`操作时,针对`gourpBySessionIdRDD`中的每一个元素(每个`Tuple2<String, Iterable<Row>>`类型的键值对)都会调用这个`call`方法来执行具体的转换逻辑,
|
|
|
|
|
// 参数`tuple2`就是当前正在处理的那个键值对元素,其类型符合接口定义中的`Tuple2<String, Iterable<Row>>`。
|
|
|
|
|
|
|
|
|
|
String sessionId = tuple2._1;
|
|
|
|
|
// 从传入的键值对`tuple2`中获取其键部分(也就是当前会话的`SessionId`),并赋值给`sessionId`变量,
|
|
|
|
|
// 后续的所有操作都会基于这个会话ID来统计该会话对不同品类的点击次数等信息,确保数据处理是围绕着同一个会话进行的。
|
|
|
|
|
|
|
|
|
|
// 保存每一个品类的单击次数
|
|
|
|
|
Map<Long, Long> categoryIdCount = new HashMap<Long, Long>();
|
|
|
|
|
// 创建一个名为`categoryIdCount`的`HashMap`,用于存储每个品类ID(以`Long`类型作为键)对应的点击次数(以`Long`类型作为值),
|
|
|
|
|
// 这个`Map`会在遍历当前会话的详细信息时,用来记录和更新各个品类的点击次数情况。
|
|
|
|
|
|
|
|
|
|
for (Row row : tuple2._2) {
|
|
|
|
|
// 开始遍历`tuple2`的值部分(是一个可迭代的`Row`类型的集合,代表当前`SessionId`对应的所有会话详细信息行),
|
|
|
|
|
// 通过遍历这些行数据,可以获取每行中的相关字段信息,以此来判断是否有品类点击行为,并相应地更新对应品类的点击次数。
|
|
|
|
|
|
|
|
|
|
if (row.get(6) != null) {
|
|
|
|
|
// 判断当前行数据(`Row`类型的`row`)中索引为6的字段(从代码上下文推测这个字段代表点击品类的ID,不过具体还是要结合数据结构定义来确定)是否为空,
|
|
|
|
|
// 如果不为空,说明存在点击品类相关的信息,那就需要对该品类的点击次数进行处理了。
|
|
|
|
|
|
|
|
|
|
Long count = categoryIdCount.get(row.getLong(6));
|
|
|
|
|
// 从`categoryIdCount`这个`Map`中获取当前点击品类的ID(通过`row.getLong(6)`获取点击品类的ID,并以此作为键)对应的点击次数,
|
|
|
|
|
// 如果该品类ID之前已经存在于`Map`中,那么就能获取到对应的点击次数值(可能初始为0或者是之前累计的次数),要是不存在则会返回`null`。
|
|
|
|
|
|
|
|
|
|
if (count == null) {
|
|
|
|
|
count = 0L;
|
|
|
|
@ -1917,26 +1897,26 @@ public class UserVisitAnalyze {
|
|
|
|
|
|
|
|
|
|
categoryIdCount.put(row.getLong(6), count);
|
|
|
|
|
// 把更新后的点击次数(`count`变量的值)重新放回`categoryIdCount`这个`Map`中,以当前点击品类的ID(`row.getLong(6)`)作为键,
|
|
|
|
|
// 这样就完成了对该品类点击次数的更新操作,保证`Map`里始终记录着每个品类在当前会话中的最新点击次数情况。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<Tuple2<Long, String>> categoryIdCountList = new ArrayList<Tuple2<Long, String>>();
|
|
|
|
|
// 创建一个名为`categoryIdCountList`的`ArrayList`列表,用于存放即将生成的包含品类ID以及对应点击次数信息的键值对,
|
|
|
|
|
// 其键为品类ID(`Long`类型),值是一个包含会话ID和点击次数的字符串(`String`类型),方便后续整合数据以及转换为RDD进行处理。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (Map.Entry<Long, Long> entry : categoryIdCount.entrySet()) {
|
|
|
|
|
// 开始遍历`categoryIdCount`这个`Map`中的所有键值对(每个键值对代表一个品类ID及其对应的点击次数),
|
|
|
|
|
// 通过遍历这些键值对,可以依次提取每个品类的相关信息,进而构建成新的键值对添加到`categoryIdCountList`列表中。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 将数据拼接
|
|
|
|
|
String value = sessionId + "," + entry.getValue();
|
|
|
|
|
// 根据一定的格式规则构建一个字符串`value`,把当前会话的`ID`(`sessionId`)和当前品类的点击次数(通过`entry.getValue()`获取到的`Long`类型的点击次数,转换为字符串后)
|
|
|
|
|
// 用逗号作为分隔符连接起来,形成一个包含会话ID和点击次数信息的字符串,方便后续继续整合其他信息以及做持久化等操作使用。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
categoryIdCountList.add(new Tuple2<Long, String>(entry.getKey(), value));
|
|
|
|
|
// 将品类ID(通过`entry.getKey()`获取到的`Long`类型的品类ID)和构建好的包含会话ID与点击次数信息的字符串(`value`)封装成一个新的键值对,
|
|
|
|
|
// 然后添加到`categoryIdCountList`列表中,这样就把每个品类在当前会话中的点击次数信息整理成了便于后续处理的键值对形式。
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
return categoryIdCountList;
|
|
|
|
|
// 返回包含了所有品类在当前会话中的点击次数信息(以键值对形式存在,键为品类ID,值为包含会话ID和点击次数的字符串)的`categoryIdCountList`列表,
|
|
|
|
@ -1954,7 +1934,7 @@ public class UserVisitAnalyze {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<Long, String> call(Tuple2<Long, Tuple2<Long, String>> tuple2) throws Exception {
|
|
|
|
|
// 重写了PairFunction接口中的call方法,当执行mapToPair操作时,会针对连接后的结果数据中的每一个元素(每个Tuple2<Long, Tuple2<Long, String>>类型的键值对)调用这个call方法,
|
|
|
|
|
// 参数tuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2<Long, Tuple2<Long, String>>。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return new Tuple2<Long, String>(tuple2._1, tuple2._2._2);
|
|
|
|
|
// 从传入的键值对tuple2中提取其键部分(也就是品类ID,tuple2._1)以及值部分中包含点击次数信息的字符串(tuple2._2._2),
|
|
|
|
|