1.环境搭建完成

2.相关的工具类编写完成11
3.配置文件管理类编写完成1
zxr
劝酒千百 9 months ago
parent a757942f5c
commit e2745fe65c

@ -4,7 +4,7 @@
<option name="autoReloadType" value="ALL" />
</component>
<component name="ChangeListManager">
<list default="true" id="a7505764-040b-48e2-b2fc-8c5b579e595f" name="Default" comment="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成">
<list default="true" id="a7505764-040b-48e2-b2fc-8c5b579e595f" name="Default" comment="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成1">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java" afterDir="false" />
</list>
@ -480,7 +480,7 @@
<workItem from="1734354940766" duration="9337000" />
<workItem from="1734364968190" duration="929000" />
<workItem from="1734365941294" duration="2674000" />
<workItem from="1734368738786" duration="302000" />
<workItem from="1734368738786" duration="983000" />
</task>
<task id="LOCAL-00001" summary="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成">
<created>1529592741848</created>
@ -616,7 +616,23 @@
<option name="project" value="LOCAL" />
<updated>1734368636062</updated>
</task>
<option name="localTasksCounter" value="18" />
<task id="LOCAL-00018" summary="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成1">
<option name="closed" value="true" />
<created>1734369061766</created>
<option name="number" value="00018" />
<option name="presentableId" value="LOCAL-00018" />
<option name="project" value="LOCAL" />
<updated>1734369061766</updated>
</task>
<task id="LOCAL-00019" summary="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成1">
<option name="closed" value="true" />
<created>1734369482371</created>
<option name="number" value="00019" />
<option name="presentableId" value="LOCAL-00019" />
<option name="project" value="LOCAL" />
<updated>1734369482371</updated>
</task>
<option name="localTasksCounter" value="20" />
<servers />
</component>
<component name="TestHistory">
@ -742,7 +758,8 @@
<MESSAGE value="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成11" />
<MESSAGE value="1.环境搭建完成11&#10;2.相关的工具类编写完成11&#10;3.配置文件管理类编写完成11" />
<MESSAGE value="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成" />
<option name="LAST_COMMIT_MESSAGE" value="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成" />
<MESSAGE value="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成1" />
<option name="LAST_COMMIT_MESSAGE" value="1.环境搭建完成&#10;2.相关的工具类编写完成&#10;3.配置文件管理类编写完成1" />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/src/main/java/cn/edu/hust/conf/ConfigurationManager.java">

@ -1,249 +1,55 @@
//这行代码定义了当前类所在的包名。包名用于组织和管理类。
package cn.edu.hust.session;
//导入 cn.edu.hust.conf 包下的 ConfigurationManager 类。这个类可能用于管理配置信息
import cn.edu.hust.conf.ConfigurationManager;
//导入 cn.edu.hust.constant 包下的 Constants 类。这个类可能包含一些常量定义。
import cn.edu.hust.constant.Constants;
//导入 cn.edu.hust.dao 包下的 TaskDao 类。这个类可能用于与任务相关的数据访问操作。
import cn.edu.hust.dao.TaskDao;
//导入 cn.edu.hust.dao.factory 包下的 DaoFactory 类。这个类可能用于创建 DAO 对象。
import cn.edu.hust.dao.factory.DaoFactory;
//导入 cn.edu.hust.domain 包下的所有类。这个包可能包含一些数据模型类。
import cn.edu.hust.domain.*;
//导入 cn.edu.hust.mockData 包下的 MockData 类。这个类可能用于生成模拟数据
import cn.edu.hust.mockData.MockData;
//导入 cn.edu.hust.util 包下的所有类。这个包可能包含一些通用工具类。
import cn.edu.hust.util.*;
//导入 com.alibaba.fastjson.JSONObject 类。FastJSON 是一个用于处理 JSON 数据的库。
import com.alibaba.fastjson.JSONObject;
//导入 org.apache.spark.Accumulator 类。这个类用于在 Spark 作业中共享和累加数据。
import org.apache.spark.Accumulator;
//导入 org.apache.spark.SparkConf 类。这个类用于配置 Spark 应用程序。
import org.apache.spark.SparkConf;
//导入 org.apache.spark.SparkContext 类。这个类是 Spark 应用程序的主入口点。
import org.apache.spark.SparkContext;
//导入 org.apache.spark.api.java.JavaPairRDD 类。
// 这个类是用于处理键值对的 RDD弹性分布式数据集
import org.apache.spark.api.java.JavaPairRDD;
//导入 org.apache.spark.api.java.JavaRDD 类。这个类是用于处理普通 RDD 的 Java API。
import org.apache.spark.api.java.JavaRDD;
//导入了 JavaSparkContext 类。
// JavaSparkContext 是 org.apache.spark.api.java 包下的一个类,提供了与 Spark 集群交互的 Java API。
import org.apache.spark.api.java.JavaSparkContext;
//导入 org.apache.spark.api.java.function 包下的所有函数类。
// 这些类提供了常见的操作函数,如 FlatMapFunction, MapFunction, FilterFunction 等。
import org.apache.spark.api.java.function.*;
//导入 org.apache.spark.sql.DataFrame 类。这个类是用于操作结构化数据的。
import org.apache.spark.sql.DataFrame;
//导入 org.apache.spark.sql.Row 类。这个类是用于表示一行数据的。
import org.apache.spark.sql.Row;
//导入 org.apache.spark.sql.SQLContext 类。
// 这个类用于创建 Spark SQL 的上下文环境
import org.apache.spark.sql.SQLContext;
//导入 org.apache.spark.sql.hive.HiveContext 类。
//这个类用于与 Hive 兼容的 Spark SQL 上下文环境
import org.apache.spark.sql.hive.HiveContext;
//导入 org.apache.spark.storage.StorageLevel 类。
// 这个类定义了 Spark 中数据存储的级别。
import org.apache.spark.storage.StorageLevel;
//导入 scala.Tuple2 类。这个类用于表示一个二元组(一个包含两个元素的元组)。
import scala.Tuple2;
import java.util.*;
/**
*
* 1
* 2
* 3
* 4
* 5
* 6
*/
public class UserVisitAnalyze {
public static void main(String[] args)
{
//初始化 args 参数数组,用于传递给 main 方法。
// 这里暂时设置为 {"1"},实际使用时可以根据需要调整。
args=new String[]{"1"};
/**
* spark
*/
//创建 SparkConf 对象并设置应用程序名称和运行模式本地模式使用3个核心
//使用 SparkConf 创建 JavaSparkContext 实例初始化Spark上下文。
SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
JavaSparkContext context=new JavaSparkContext(conf);
//通过 JavaSparkContext 获取 SQLContext用于执行SQL查询。
SQLContext sc=getSQLContext(context.sc());
//生成模拟数据,调用 mock 方法生成模拟数据到Spark环境中。
mock(context,sc);
//拿到相应的Dao组建
//通过 DaoFactory 获取 TaskDao 实例,用于数据库操作。
TaskDao dao= DaoFactory.getTaskDao();
//从外部传入的参数获取任务的id
//通过 args 参数获取任务ID。
Long taskId=ParamUtils.getTaskIdFromArgs(args);
//从数据库中查询出相应的task
//通过任务ID从数据库中查询任务信息并将任务参数解析为 JSONObject
Task task=dao.findTaskById(taskId);
JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
//获取指定范围内的Sesssion
//调用 getActionRDD 方法获取包含指定范围内的Session的RDD。
JavaRDD<Row> sessionRangeDate=getActionRDD(sc,jsonObject);
//这里增加一个新的方法,主要是映射
//将 sessionRangeDate 转换为包含键值对的 PairRDD。
JavaPairRDD<String,Row> sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
//重复用到的RDD进行持久化
//将 sessionInfoPairRDD 持久化到磁盘,提高后续操作的性能
public static void main(String[] args) {
args = new String[]{"1"};
SparkConf conf = new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sc = getSQLContext(context.sc());
mock(context, sc);
TaskDao dao = DaoFactory.getTaskDao();
Long taskId = ParamUtils.getTaskIdFromArgs(args);
Task task = dao.findTaskById(taskId);
JSONObject jsonObject = JSONObject.parseObject(task.getTaskParam());
JavaRDD<Row> sessionRangeDate = getActionRDD(sc, jsonObject);
JavaPairRDD<String, Row> sessionInfoPairRDD = getSessonInfoPairRDD(sessionRangeDate);
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
//按照Sesson进行聚合
//调用 aggregateBySessionId 方法对Session信息进行聚合。
JavaPairRDD<String,String> sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
//通过条件对RDD进行筛选
//使用Accumulator进行统计
//创建一个 Accumulator 来统计聚合结果。
Accumulator<String> sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
//过滤和统计Session信息
//在进行accumulator之前需要aciton动作不然会为空
//调用 filterSessionAndAggrStat 方法过滤并统计Session信息。
JavaPairRDD<String,String> filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
//重复用到的RDD进行持久化
//将过滤后的RDD持久化到磁盘。
JavaPairRDD<String, String> sesssionAggregateInfoRDD = aggregateBySessionId(sc, sessionInfoPairRDD);
Accumulator<String> sessionAggrStatAccumulator = context.accumulator("", new SessionAggrStatAccumulator());
JavaPairRDD<String, String> filteredSessionRDD = filterSessionAndAggrStat(sesssionAggregateInfoRDD, jsonObject, sessionAggrStatAccumulator);
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
//获取符合过滤条件的全信息公共RDD
//调用 getFilterFullInfoRDD 方法获取包含完整信息的公共RDD。
JavaPairRDD<String, Row> commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
//重复用到的RDD进行持久化
//将公共RDD持久化到磁盘。
JavaPairRDD<String, Row> commonFullClickInfoRDD = getFilterFullInfoRDD(filteredSessionRDD, sessionInfoPairRDD);
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
//session聚合统计统计出访问时长和访问步长的各个区间所占的比例
/**
*
* 1RDD
*
* 2sesion
*
* 3session访访
*
* 4Accumulator
*
* Spark
*
* 1RDD
*
* 2RDD
*
* 3RDDshufflegroupBykeyreduceBykeysortByKey
*
* shuffle
* shuffleshuffle
* shuffle
*
* 4
*
*
*
*
*/
/**
* 使CountByKey
*/
//调用 randomExtractSession 方法实现随机抽取功能。
randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
//在使用Accumulutor之前需要使用Action算子否则获取的值为空这里随机计算
//filteredSessionRDD.count();
//计算各个session占比,并写入MySQL
//调用 calculateAndPersist 方法计算并持久化聚合统计结果。
calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
//调用 getTop10Category 方法获取热门品类数据Top10
List<Tuple2<CategorySortKey,String>> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
//调用 getTop10Session 方法获取热门品类点击Top10Session。
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
//关闭Spark上下文释放资源
randomExtractSession(taskId, filteredSessionRDD, sessionInfoPairRDD);
calculateAndPersist(sessionAggrStatAccumulator.value(), taskId);
List<Tuple2<CategorySortKey, String>> top10CategoryIds = getTop10Category(taskId, commonFullClickInfoRDD);
getTop10Session(context, taskId, sessionInfoPairRDD, top10CategoryIds);
context.close();
}
/**
*
* SparkSpark
* Spark
*
* Session
*
*
*
* Spark
* 访
*/
}
@ -368,14 +174,14 @@ public class UserVisitAnalyze {
//定义方法签名:
//定义一个静态方法 getSessonInfoPairRDD该方法接受一个 JavaRDD<Row> 类型的参数 sessionRangeDate
// 并返回一个 JavaPairRDD<String, Row>。
private static JavaPairRDD<String,Row> getSessonInfoPairRDD(JavaRDD<Row> sessionRangeDate) {
private static JavaPairRDD<String,Row> getSessonInfoPairRDD(JavaRDD<Row> sessionRangeDate) {
//使用 mapToPair 方法:
//使用 mapToPair 方法将 JavaRDD<Row> 转换为 JavaPairRDD<String, Row>。
// mapToPair 方法接受一个 PairFunction 实现类,
// 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2<String, Row>。
return sessionRangeDate.mapToPair(new PairFunction<Row, String, Row>() {
//使用 mapToPair 方法将 JavaRDD<Row> 转换为 JavaPairRDD<String, Row>。
// mapToPair 方法接受一个 PairFunction 实现类,
// 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2<String, Row>。
return sessionRangeDate.mapToPair(new PairFunction<Row, String, Row>() {
@Override
//实现 PairFunction
//实现 PairFunction 接口的 call 方法。
@ -388,16 +194,16 @@ public class UserVisitAnalyze {
return new Tuple2<String, Row>(row.getString(2),row);
}
});
/**
*
* 使 mapToPair Row Tuple2<String, Row>
* Row 3 Row
* Row 便
* getSessonInfoPairRDD JavaRDD<Row> JavaPairRDD<String, Row>
* Row sessionId
* Row
* sessionId
*/
/**
*
* 使 mapToPair Row Tuple2<String, Row>
* Row 3 Row
* Row 便
* getSessonInfoPairRDD JavaRDD<Row> JavaPairRDD<String, Row>
* Row sessionId
* Row
* sessionId
*/
}
@ -420,11 +226,11 @@ public class UserVisitAnalyze {
/**
*
JavaPairRDD<String,Row> sessionActionPair=sessionRangeDate.mapToPair(new PairFunction<Row, String,Row>() {
@Override
public Tuple2<String, Row> call(Row row) throws Exception {
return new Tuple2<String, Row>(row.getString(2),row);
}
JavaPairRDD<String,Row> sessionActionPair=sessionRangeDate.mapToPair(new PairFunction<Row, String,Row>() {
@Override
public Tuple2<String, Row> call(Row row) throws Exception {
return new Tuple2<String, Row>(row.getString(2),row);
}
});*/
/**
* sessionId
@ -497,7 +303,7 @@ public class UserVisitAnalyze {
{
endTime=actionTime;
}
stepLength++;
stepLength++;
}
@ -516,22 +322,22 @@ public class UserVisitAnalyze {
});
//查询所有的用户数据
String sql="select * from user_info";
String sql="select * from user_info";
//执行SQL查询获取用户信息并将结果转换为 JavaRDD<Row>。
JavaRDD<Row> userInfoRDD=sc.sql(sql).javaRDD();
//执行SQL查询获取用户信息并将结果转换为 JavaRDD<Row>。
JavaRDD<Row> userInfoRDD=sc.sql(sql).javaRDD();
//将用户信息映射成map
//将用户信息映射成map
//使用 mapToPair 方法将 JavaRDD<Row> 转换为 JavaPairRDD<Long, Row>
// 其中键为 Row 对象中的某个列(通常是 userId值为整个 Row 对象。
JavaPairRDD<Long,Row> userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>() {
@Override
public Tuple2<Long, Row> call(Row row) throws Exception {
return new Tuple2<Long, Row>(row.getLong(0),row);
}
});
//将两个信息join在一起
JavaPairRDD<Long,Row> userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>() {
@Override
public Tuple2<Long, Row> call(Row row) throws Exception {
return new Tuple2<Long, Row>(row.getLong(0),row);
}
});
//将两个信息join在一起
//使用 join 方法将两个 JavaPairRDD 进行连接,连接键为 userId。结果是一个 JavaPairRDD<Long, Tuple2<String, Row>>
// 其中键为 userId值为一个包含用户会话信息和用户信息的元组。
JavaPairRDD<Long,Tuple2<String,Row>> tuple2JavaPairRDD=sessionPartInfo.join(userInfoPariRDD);
@ -737,7 +543,7 @@ public class UserVisitAnalyze {
else if(stepLength>30&&stepLength<=60)
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
else if(stepLength>60)
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
});
//返回过滤后的会话数据 filteredSessionRDD。
@ -847,7 +653,7 @@ public class UserVisitAnalyze {
// 从键中分割出小时部分,以"_"作为分隔符取第二个元素。
String hour=entry.getKey().split("_")[1];
// 根据日期从dateHourCountMap中获取对应的内层Map存放小时和数量的映射
// 根据日期从dateHourCountMap中获取对应的内层Map存放小时和数量的映射
// 如果不存在则返回null。
Map<String,Long> hourCount=dateHourCountMap.get(date);
@ -1435,14 +1241,14 @@ public class UserVisitAnalyze {
/**
*
* @param categoryRDD
* @param clickCategoryRDD
* @param orderCategoryRDD
* @param payCategoryRDD
* @return
*/
/**
*
* @param categoryRDD
* @param clickCategoryRDD
* @param orderCategoryRDD
* @param payCategoryRDD
* @return
*/
private static JavaPairRDD<Long, String> joinCategoryAndData(JavaPairRDD<Long, Long> categoryRDD, JavaPairRDD<Long, Long> clickCategoryRDD, JavaPairRDD<Long, Long> orderCategoryRDD, JavaPairRDD<Long, Long> payCategoryRDD) {
// 此方法名为joinCategoryAndData作用是将不同来源的品类相关数据品类ID以及对应的点击、下单、支付次数等信息进行关联整合
// 接收四个JavaPairRDD类型的参数分别是categoryRDD包含品类ID信息、clickCategoryRDD点击品类次数相关信息
@ -1579,7 +1385,7 @@ public class UserVisitAnalyze {
//后去支付品类RDD
//后去支付品类RDD
private static JavaPairRDD<Long, Long> getPayCategoryRDD(JavaPairRDD<String, Row> sessionId2DetailRDD) {
// 此方法名为getPayCategoryRDD其目的是从给定的JavaPairRDD<String, Row>类型的sessionId2DetailRDD中提取出与支付品类相关的信息
// 经过一系列处理后最终返回一个JavaPairRDD<Long, Long>类型的结果其中键和值大概率都与支付品类相关比如品类ID以及对应的支付次数等从后续代码逻辑推测
@ -1671,7 +1477,7 @@ public class UserVisitAnalyze {
//获取下单品类RDD
//获取下单品类RDD
private static JavaPairRDD<Long, Long> getOrderCategoryRDD(JavaPairRDD<String, Row> sessionId2DetailRDD) {
// 此方法名为getOrderCategoryRDD其功能是从给定的JavaPairRDD<String, Row>类型的sessionId2DetailRDD中提取出与下单品类相关的信息
// 通过一系列的数据处理操作最终返回一个JavaPairRDD<Long, Long>类型的结果这个结果大概率包含了下单品类ID以及对应的下单次数等相关数据从代码逻辑推测
@ -1835,7 +1641,7 @@ public class UserVisitAnalyze {
//获取每一个品类的Session Top10
//获取每一个品类的Session Top10
private static void getTop10Session(JavaSparkContext sc, final Long taskId, JavaPairRDD<String, Row> sessionInfoPairRDD, List<Tuple2<CategorySortKey, String>> top10CategoryIds)
{
// 此方法名为`getTop10Session`作用是基于给定的Spark上下文`JavaSparkContext`、任务ID`taskId`)、
@ -1855,17 +1661,25 @@ public class UserVisitAnalyze {
// 并赋值给`countInfo`变量后续将从这个字符串里按照特定的分隔规则提取出品类ID相关内容。
Long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
// 借助`StringUtils`工具类(应该是自定义的用于字符串处理的工具类)的`getFieldFromConcatString`方法,
// 按照“|”作为分隔符,从`countInfo`字符串中提取出对应`Constants.FIELD_CATEGORY_ID`(推测是在`Constants`类中定义好的常量用于明确品类ID在字符串中的位置等信息
// 这个字段对应的内容,然后将其转换为`Long`类型,赋值给`categoryId`变量这样就获取到了当前品类的ID信息。
categoryIdList.add(new Tuple2<Long, Long>(categoryId, categoryId));
// 将获取到的品类ID信息封装成一个键值对这里键和值都设置为该品类ID然后添加到`categoryIdList`列表中,
// 这么做可能是为了后续方便统一数据格式便于生成RDD以及基于品类ID与其他数据做关联操作等。
}
// 生成一份RDD
JavaPairRDD<Long, Long> top10CategoryIdsRDD = sc.parallelizePairs(categoryIdList);
// 使用`JavaSparkContext``sc`)的`parallelizePairs`方法把包含品类ID信息的`categoryIdList`列表转换为一个`JavaPairRDD<Long, Long>`类型的RDD即`top10CategoryIdsRDD`
// 该RDD以键值对形式存储了排名前10的品类的ID信息键和值都是品类ID方便后续在Spark环境下基于品类ID与其他RDD进行各种操作例如进行连接`join`)等操作。
// 按照SessionId进行分组
JavaPairRDD<String, Iterable<Row>> gourpBySessionIdRDD = sessionInfoPairRDD.groupByKey();
// 调用`sessionInfoPairRDD`的`groupByKey`方法对这个RDD按照其键也就是`SessionId`)进行分组操作,
// 分组后的结果是一个新的`JavaPairRDD<String, Iterable<Row>>`类型的`gourpBySessionIdRDD`,其键为`SessionId`,值是一个可迭代的`Row`类型的集合,
// 这意味着每个`SessionId`对应的所有会话详细信息(以`Row`类型表示,通常每行数据包含多个字段)会被放在一起,方便后续针对每个会话做进一步的相关统计分析等操作。
// 计算每一个session对品类的点击次数
JavaPairRDD<Long, String> categorySessionCountRDD = gourpBySessionIdRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
@ -1875,17 +1689,29 @@ public class UserVisitAnalyze {
@Override
public Iterable<Tuple2<Long, String>> call(Tuple2<String, Iterable<Row>> tuple2) throws Exception {
// 重写了`PairFlatMapFunction`接口中的`call`方法,当执行`flatMapToPair`操作时,针对`gourpBySessionIdRDD`中的每一个元素(每个`Tuple2<String, Iterable<Row>>`类型的键值对)都会调用这个`call`方法来执行具体的转换逻辑,
// 参数`tuple2`就是当前正在处理的那个键值对元素,其类型符合接口定义中的`Tuple2<String, Iterable<Row>>`。
String sessionId = tuple2._1;
// 从传入的键值对`tuple2`中获取其键部分(也就是当前会话的`SessionId`),并赋值给`sessionId`变量,
// 后续的所有操作都会基于这个会话ID来统计该会话对不同品类的点击次数等信息确保数据处理是围绕着同一个会话进行的。
// 保存每一个品类的单击次数
Map<Long, Long> categoryIdCount = new HashMap<Long, Long>();
// 创建一个名为`categoryIdCount`的`HashMap`用于存储每个品类ID以`Long`类型作为键)对应的点击次数(以`Long`类型作为值),
// 这个`Map`会在遍历当前会话的详细信息时,用来记录和更新各个品类的点击次数情况。
for (Row row : tuple2._2) {
// 开始遍历`tuple2`的值部分(是一个可迭代的`Row`类型的集合,代表当前`SessionId`对应的所有会话详细信息行),
// 通过遍历这些行数据,可以获取每行中的相关字段信息,以此来判断是否有品类点击行为,并相应地更新对应品类的点击次数。
if (row.get(6) != null) {
// 判断当前行数据(`Row`类型的`row`中索引为6的字段从代码上下文推测这个字段代表点击品类的ID不过具体还是要结合数据结构定义来确定是否为空
// 如果不为空,说明存在点击品类相关的信息,那就需要对该品类的点击次数进行处理了。
Long count = categoryIdCount.get(row.getLong(6));
// 从`categoryIdCount`这个`Map`中获取当前点击品类的ID通过`row.getLong(6)`获取点击品类的ID并以此作为键对应的点击次数
// 如果该品类ID之前已经存在于`Map`中那么就能获取到对应的点击次数值可能初始为0或者是之前累计的次数要是不存在则会返回`null`。
if (count == null) {
count = 0L;
@ -1897,26 +1723,26 @@ public class UserVisitAnalyze {
categoryIdCount.put(row.getLong(6), count);
// 把更新后的点击次数(`count`变量的值)重新放回`categoryIdCount`这个`Map`中以当前点击品类的ID`row.getLong(6)`)作为键,
// 这样就完成了对该品类点击次数的更新操作,保证`Map`里始终记录着每个品类在当前会话中的最新点击次数情况。
}
}
List<Tuple2<Long, String>> categoryIdCountList = new ArrayList<Tuple2<Long, String>>();
// 创建一个名为`categoryIdCountList`的`ArrayList`列表用于存放即将生成的包含品类ID以及对应点击次数信息的键值对
// 其键为品类ID`Long`类型值是一个包含会话ID和点击次数的字符串`String`类型方便后续整合数据以及转换为RDD进行处理。
for (Map.Entry<Long, Long> entry : categoryIdCount.entrySet()) {
// 开始遍历`categoryIdCount`这个`Map`中的所有键值对每个键值对代表一个品类ID及其对应的点击次数
// 通过遍历这些键值对,可以依次提取每个品类的相关信息,进而构建成新的键值对添加到`categoryIdCountList`列表中。
// 将数据拼接
String value = sessionId + "," + entry.getValue();
// 根据一定的格式规则构建一个字符串`value`,把当前会话的`ID``sessionId`)和当前品类的点击次数(通过`entry.getValue()`获取到的`Long`类型的点击次数,转换为字符串后)
// 用逗号作为分隔符连接起来形成一个包含会话ID和点击次数信息的字符串方便后续继续整合其他信息以及做持久化等操作使用。
categoryIdCountList.add(new Tuple2<Long, String>(entry.getKey(), value));
// 将品类ID通过`entry.getKey()`获取到的`Long`类型的品类ID和构建好的包含会话ID与点击次数信息的字符串`value`)封装成一个新的键值对,
// 然后添加到`categoryIdCountList`列表中,这样就把每个品类在当前会话中的点击次数信息整理成了便于后续处理的键值对形式。
}
return categoryIdCountList;
// 返回包含了所有品类在当前会话中的点击次数信息以键值对形式存在键为品类ID值为包含会话ID和点击次数的字符串的`categoryIdCountList`列表,
@ -1934,22 +1760,25 @@ public class UserVisitAnalyze {
@Override
public Tuple2<Long, String> call(Tuple2<Long, Tuple2<Long, String>> tuple2) throws Exception {
// 重写了PairFunction接口中的call方法当执行mapToPair操作时会针对连接后的结果数据中的每一个元素每个Tuple2<Long, Tuple2<Long, String>>类型的键值对调用这个call方法
// 参数tuple2就是当前正在处理的那个键值对元素其类型符合接口定义中的Tuple2<Long, Tuple2<Long, String>>。
return new Tuple2<Long, String>(tuple2._1, tuple2._2._2);
// 从传入的键值对tuple2中提取其键部分也就是品类IDtuple2._1以及值部分中包含点击次数信息的字符串tuple2._2._2
// 创建一个新的键值对键为品类ID值为包含点击次数信息的字符串这样就得到了每一个热门品类在各个会话中的点击次数相关信息的键值对形式
// 这些新的键值对将构成新的JavaPairRDD<Long, String>即前面定义的top10CategorySessionCountRDD用于后续基于热门品类的分组等操作。
}
});
// 根据品类分组
JavaPairRDD<Long, Iterable<String>> top10CategorySessionCountGroupRDD = top10CategorySessionCountRDD.groupByKey();
// 调用top10CategorySessionCountRDD的groupByKey方法对其按照键也就是品类ID进行分组操作
// 分组后的结果是一个新的JavaPairRDD<Long, Iterable<String>>类型的top10CategorySessionCountGroupRDD其键为品类ID值是一个可迭代的字符串集合
// 每个集合中的字符串包含了对应品类在不同会话中的点击次数等相关信息,方便后续针对每个品类汇总分析其在各个会话中的情况,例如找出每个品类对应的点击次数较多的会话等操作。
JavaPairRDD<String, String> top10CategorySessionRDD = top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, Iterable<String>>, String, String>() {
// 调用top10CategorySessionCountGroupRDD的flatMapToPair方法传入一个实现了PairFlatMapFunction接口的匿名内部类实例
// 此操作的目的是将分组后的top10CategorySessionCountGroupRDD中的元素类型为Tuple2<Long, Iterable<String>>按照自定义的逻辑转换为零个、一个或多个新的键值对输出类型是Tuple2<String, String>
// 在这里主要是从每个品类对应的会话点击次数信息中提取出排名前10的会话信息并整理成相应的键值对形式便于后续持久化到数据库等操作。
@Override
public Iterable<Tuple2<String, String>> call(Tuple2<Long, Iterable<String>> tuple2) throws Exception {
@ -1971,18 +1800,20 @@ public class UserVisitAnalyze {
List<Tuple2<String, String>> sessionIdList = new ArrayList<Tuple2<String, String>>();
// 创建一个名为sessionIdList的ArrayList列表用于存放即将生成的包含会话ID的键值对其键和值都为会话IDString类型
// 后续会将排名前10的会话ID信息整理成这样的键值对添加到这个列表中用于返回给外部或者进行其他相关操作比如和其他数据关联等
for (String sessionCount : tuple2._2) {
// 开始遍历tuple2的值部分是一个可迭代的字符串集合每个字符串包含了对应品类在某个会话中的点击次数等相关信息通过遍历可以依次获取每个会话的相关信息
// 用于判断是否能进入排名前10的会话列表以及更新相应的排名情况。
String[] sessionCountSplited = sessionCount.split(",");
// 将当前遍历到的包含会话信息的字符串sessionCount按照逗号作为分隔符进行拆分得到一个字符串数组sessionCountSplited
// 数组中第一个元素索引为0应该是会话ID第二个元素索引为1应该是点击次数从前面构建字符串的逻辑推测方便后续分别提取使用。
// String sessionId = sessionCountSplited[0];
Long count = Long.valueOf(sessionCountSplited[1]);
// 从拆分后的字符串数组sessionCountSplited中获取第二个元素索引为1也就是点击次数信息并将其转换为Long类型赋值给count变量
// 用于后续和已有的排名前10的会话信息进行比较判断是否能进入前10的列表。
// 获取TopN
for (int i = 0; i < top10Sessions.length; i++) {
@ -1999,7 +1830,10 @@ public class UserVisitAnalyze {
}
}
}
// 上述循环用于将当前会话的点击次数信息sessionCount与已有的top10Sessions数组中的会话信息进行比较
// 如果top10Sessions数组中某个位置为空即还未满10个会话信息则直接将当前会话信息放入该位置
// 如果该位置已有会话信息,则比较它们的点击次数(通过解析字符串获取),如果当前会话的点击次数大于已有位置的点击次数,
// 则将已有位置及之后的会话信息依次往后移动一位腾出当前位置然后将当前会话信息放入该位置以此来动态维护每个品类的前10个点击次数最多的会话信息列表。
}
// 封装数据
@ -2013,19 +1847,33 @@ public class UserVisitAnalyze {
sessionIdList.add(new Tuple2<String, String>(sessionId, sessionId));
}
}
// 对于top10Sessions数组中不为空的每个位置也就是每个品类对应的排名前10的会话信息进行如下操作
// 首先创建一个Top10CategorySession类型的对象top10CategorySession然后从对应的会话信息字符串top10Sessions[i]中解析出会话ID通过split方法获取第一个元素
// 和点击次数通过split方法获取第二个元素并转换为Long类型接着调用top10CategorySession对象的set方法假设该类有这样的方法用于设置属性值
// 将外部传入的任务IDtaskId、当前品类IDcategoryId、解析出的会话IDsessionId以及点击次数count设置到对象中
// 将封装好数据的top10CategorySession对象添加到top10CategorySessionList列表中同时将会话ID封装成键值对键和值都为会话ID添加到sessionIdList列表中
// 这样top10CategorySessionList就包含了所有需要持久化到数据库的排名前10的品类会话详细信息对象而sessionIdList则包含了对应的会话ID信息键值对。
// 批量插入数据库
DaoFactory.getTop10CategorySessionDao().batchInsert(top10CategorySessionList);
// 通过DaoFactory应该是自定义的数据访问工厂类用于获取数据库操作相关的DAO对象的getTop10CategorySessionDao方法
// 获取到用于操作Top10CategorySession数据的DAO对象然后调用其batchInsert方法
// 将包含要持久化数据的top10CategorySessionList列表传入实现将排名前10的品类会话详细信息批量插入到数据库中的功能完成数据持久化的操作流程。
return sessionIdList;
// 返回包含会话ID信息键值对的sessionIdList列表虽然这里代码逻辑上返回了这个列表但从整体功能来看重点在于前面已经完成了将排名前10的品类会话详细信息插入数据库的操作
// 这个返回值可能在后续代码(如果有的话)中用于其他关联操作或者只是作为一种函数执行结果的返回形式存在,具体要结合整体的代码上下文来进一步确定其用途。
}
});
//3. 获取session的明细数据保存到数据库
JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD = top10CategorySessionRDD.join(sessionInfoPairRDD);
// 对top10CategorySessionRDD前面经过一系列处理得到的与排名前10的品类会话相关的RDD其键值对类型为<String, String>,从前面代码逻辑推测键可能是会话相关信息,值是对应信息字符串)
// 和sessionInfoPairRDD包含完整会话信息的RDD键值对类型为<String, Row>键是会话ID值是包含多个字段的Row类型的会话详细数据行进行连接join操作。
// 连接的目的是将排名前10的品类会话相关信息与完整的会话详细数据进行整合生成一个新的JavaPairRDD<String, Tuple2<String, Row>>类型的sessionDetailRDD
// 其键为某个用于关联的标识从前面逻辑推测可能与会话相关值是一个包含两个元素的Tuple2第一个元素是字符串可能是前面top10品类会话相关的部分信息第二个元素是Row类型的完整会话详细数据
// 方便后续基于整合后的数据提取出需要持久化到数据库的详细会话信息。
sessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
// 调用sessionDetailRDD的foreachPartition方法传入一个实现了VoidFunction接口的匿名内部类实例
@ -2035,81 +1883,89 @@ public class UserVisitAnalyze {
@Override
public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> tuple2Iterator) throws Exception {
// 重写了VoidFunction接口中的call方法当foreachPartition方法遍历每个分区时会针对每个分区对应的迭代器Iterator<Tuple2<String, Tuple2<String, Row>>>类型其中元素是包含会话相关信息的复杂键值对调用这个call方法
// 参数tuple2Iterator就是当前正在处理的那个分区对应的迭代器通过它可以遍历该分区内的所有元素每个元素是Tuple2<String, Tuple2<String, Row>>类型的键值对)进行具体的数据提取和处理操作。
List<SessionDetail> sessionDetailList = new ArrayList<SessionDetail>();
// 创建一个名为sessionDetailList的ArrayList列表用于存放SessionDetail类型的对象
// SessionDetail应该是自定义的用于封装要持久化到数据库的详细会话信息的实体类后续会将从分区数据中提取出来的各个会话的详细信息封装成此类对象添加到这个列表中
// 以便进行批量插入数据库的操作。
while (tuple2Iterator.hasNext()) {
// 通过调用迭代器tuple2Iterator的hasNext方法判断当前分区内是否还有未处理的元素即Tuple2<String, Tuple2<String, Row>>类型的键值对),如果有则进入循环进行处理。
Tuple2<String, Tuple2<String, Row>> tuple2 = tuple2Iterator.next();
// 调用迭代器tuple2Iterator的next方法获取下一个要处理的元素键值对并赋值给tuple2变量
// tuple2的类型为Tuple2<String, Tuple2<String, Row>>其第一个元素tuple2._1可能是会话相关的标识信息第二个元素tuple2._2是一个包含字符串和Row类型数据的Tuple2
// 后续将从这个复杂的结构中提取出具体的会话详细字段信息进行封装。
Row row = tuple2._2._2;
// 从tuple2的值部分即Tuple2<String, Row>类型的元素中获取第二个元素也就是Row类型的数据赋值给row变量
// Row类型通常代表一行包含多个字段的会话详细数据后续将从这个row中提取出各个具体的字段信息用于构建SessionDetail对象。
String sessionId = tuple2._1;
// 从tuple2中获取其键部分也就是前面提到的可能与会话相关的标识信息赋值给sessionId变量这里将其作为会话的ID信息
// 后续会把这个会话ID设置到SessionDetail对象中确保数据的完整性和关联性。
Long userId = row.getLong(1);
// 从row数据行中获取索引为1的字段从代码上下文推测该字段存储的是用户ID信息不过具体要结合数据结构定义确定并将其转换为Long类型赋值给userId变量
// 用于后续将用户ID信息封装到SessionDetail对象中记录该会话对应的用户情况。
Long pageId = row.getLong(3);
// 从row数据行中获取索引为3的字段推测该字段存储的是页面ID等相关信息具体依据数据结构定义转换为Long类型后赋值给pageId变量
// 以便将页面相关信息添加到SessionDetail对象中完善会话详细信息的记录。
String actionTime = row.getString(4);
// 从row数据行中获取索引为4的字段可能是会话操作时间等相关信息依据数据结构而定赋值给actionTime变量
// 后续会把这个时间信息设置到SessionDetail对象中用于记录会话发生的时间情况。
String searchKeyWard = row.getString(5);
// 从row数据行中获取索引为5的字段可能是搜索关键词等相关信息根据实际数据结构来确定赋值给searchKeyWard变量
// 用于将搜索相关情况添加到SessionDetail对象中更全面地记录会话的详细内容。
Long clickCategoryId = row.getLong(6);
// 从row数据行中获取索引为6的字段推测是点击品类的ID信息结合前面代码逻辑判断转换为Long类型后赋值给clickCategoryId变量
// 这样就能把点击品类相关信息纳入到SessionDetail对象中记录会话中涉及的品类点击情况。
Long clickProducetId = row.getLong(7);
// 从row数据行中获取索引为7的字段可能是点击产品的ID信息根据数据结构定义赋值给clickProducetId变量
// 以便在SessionDetail对象中记录点击产品的相关情况丰富会话详细信息的内容。
String orderCategoryId = row.getString(8);
// 从row数据行中获取索引为8的字段推测是下单品类的ID相关信息从整体业务逻辑推测赋值给orderCategoryId变量
// 后续会将下单品类信息添加到SessionDetail对象中用于记录会话中涉及的下单品类情况。
String orderProducetId = row.getString(9);
// 从row数据行中获取索引为9的字段可能是下单产品的ID相关信息依据数据结构确定赋值给orderProducetId变量
// 以便把下单产品相关情况设置到SessionDetail对象中更完整地记录会话中下单相关的详细信息。
String payCategoryId = row.getString(10);
// 从row数据行中获取索引为10的字段推测是支付品类的ID相关信息结合业务逻辑判断赋值给payCategoryId变量
// 用于将支付品类信息纳入到SessionDetail对象中记录会话中涉及的支付品类情况。
String payProducetId = row.getString(11);
// 从row数据行中获取索引为11的字段可能是支付产品的ID相关信息根据数据结构定义赋值给payProducetId变量
// 这样就能在SessionDetail对象中记录支付产品的相关情况完善会话详细信息的记录使其包含会话涉及的各种业务行为相关的详细信息。
SessionDetail sessionDetail = new SessionDetail();
// 创建一个SessionDetail类型的对象sessionDetail此类是用于封装完整会话详细信息的自定义实体类接下来将把前面提取到的各个字段信息设置到这个对象中。
sessionDetail.set(taskId, userId, sessionId, pageId, actionTime, searchKeyWard, clickCategoryId, clickProducetId, orderCategoryId, orderProducetId, payCategoryId, payProducetId);
// 调用sessionDetail对象的set方法假设SessionDetail类有此方法用于设置对象的各个属性值
// 将外部传入的任务IDtaskId以及前面提取出来的用户ID、会话ID、页面ID、操作时间、搜索关键词、点击品类ID、点击产品ID、下单品类ID、下单产品ID、支付品类ID、支付产品ID等信息设置到sessionDetail对象中
// 完成对该对象的属性赋值操作,使其封装好了当前会话的完整详细信息,准备添加到列表中进行批量插入数据库操作。
sessionDetailList.add(sessionDetail);
// 将封装好数据的sessionDetail对象添加到sessionDetailList列表中通过循环不断添加最终这个列表将包含当前分区内所有会话的完整详细信息对象
// 待整个分区的数据都处理完后,就可以进行批量插入数据库的操作了。
}
DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
// 通过DaoFactory应该是自定义的数据访问工厂类用于获取数据库操作相关的DAO对象的getSessionDetailDao方法
// 获取到用于操作SessionDetail数据的DAO对象然后调用其batchInsert方法
// 将包含要持久化数据的sessionDetailList列表传入实现将当前分区内所有会话的详细信息批量插入到数据库中的功能
// 完成对每个分区数据的持久化操作,整个流程遍历完所有分区后,就将所有相关会话的详细信息都保存到数据库中了。
}
});
}
}
}

Loading…
Cancel
Save