From e2745fe65ca5696a210e7033d9d4f038b8420d2b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=8A=9D=E9=85=92=E5=8D=83=E7=99=BE?= <3489241516@qq.com>
Date: Tue, 17 Dec 2024 01:28:37 +0800
Subject: [PATCH] =?UTF-8?q?1.=E7=8E=AF=E5=A2=83=E6=90=AD=E5=BB=BA=E5=AE=8C?=
=?UTF-8?q?=E6=88=90=202.=E7=9B=B8=E5=85=B3=E7=9A=84=E5=B7=A5=E5=85=B7?=
=?UTF-8?q?=E7=B1=BB=E7=BC=96=E5=86=99=E5=AE=8C=E6=88=9011=203.=E9=85=8D?=
=?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E7=AE=A1=E7=90=86=E7=B1=BB=E7=BC=96?=
=?UTF-8?q?=E5=86=99=E5=AE=8C=E6=88=901?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/workspace.xml | 25 +-
.../cn/edu/hust/session/UserVisitAnalyze.java | 444 ++++++------------
2 files changed, 171 insertions(+), 298 deletions(-)
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 163cca3..fb6687f 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,7 +4,7 @@
-
+
@@ -480,7 +480,7 @@
-
+
1529592741848
@@ -616,7 +616,23 @@
1734368636062
-
+
+
+ 1734369061766
+
+
+
+ 1734369061766
+
+
+
+ 1734369482371
+
+
+
+ 1734369482371
+
+
@@ -742,7 +758,8 @@
-
+
+
diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
index 1167441..b6c30c3 100644
--- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
+++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
@@ -1,249 +1,55 @@
-//这行代码定义了当前类所在的包名。包名用于组织和管理类。
package cn.edu.hust.session;
-//导入 cn.edu.hust.conf 包下的 ConfigurationManager 类。这个类可能用于管理配置信息
-
import cn.edu.hust.conf.ConfigurationManager;
-
-//导入 cn.edu.hust.constant 包下的 Constants 类。这个类可能包含一些常量定义。
-
import cn.edu.hust.constant.Constants;
-
-//导入 cn.edu.hust.dao 包下的 TaskDao 类。这个类可能用于与任务相关的数据访问操作。
-
import cn.edu.hust.dao.TaskDao;
-
-//导入 cn.edu.hust.dao.factory 包下的 DaoFactory 类。这个类可能用于创建 DAO 对象。
-
import cn.edu.hust.dao.factory.DaoFactory;
-
-//导入 cn.edu.hust.domain 包下的所有类。这个包可能包含一些数据模型类。
-
import cn.edu.hust.domain.*;
-
-//导入 cn.edu.hust.mockData 包下的 MockData 类。这个类可能用于生成模拟数据
-
import cn.edu.hust.mockData.MockData;
-
-//导入 cn.edu.hust.util 包下的所有类。这个包可能包含一些通用工具类。
-
import cn.edu.hust.util.*;
-
-//导入 com.alibaba.fastjson.JSONObject 类。FastJSON 是一个用于处理 JSON 数据的库。
-
import com.alibaba.fastjson.JSONObject;
-
-//导入 org.apache.spark.Accumulator 类。这个类用于在 Spark 作业中共享和累加数据。
-
import org.apache.spark.Accumulator;
-
-//导入 org.apache.spark.SparkConf 类。这个类用于配置 Spark 应用程序。
-
import org.apache.spark.SparkConf;
-
-//导入 org.apache.spark.SparkContext 类。这个类是 Spark 应用程序的主入口点。
-
import org.apache.spark.SparkContext;
-
-//导入 org.apache.spark.api.java.JavaPairRDD 类。
-// 这个类是用于处理键值对的 RDD(弹性分布式数据集)
-
import org.apache.spark.api.java.JavaPairRDD;
-
-//导入 org.apache.spark.api.java.JavaRDD 类。这个类是用于处理普通 RDD 的 Java API。
-
import org.apache.spark.api.java.JavaRDD;
-
-//导入了 JavaSparkContext 类。
-// JavaSparkContext 是 org.apache.spark.api.java 包下的一个类,提供了与 Spark 集群交互的 Java API。
-
import org.apache.spark.api.java.JavaSparkContext;
-
-//导入 org.apache.spark.api.java.function 包下的所有函数类。
-// 这些类提供了常见的操作函数,如 FlatMapFunction, MapFunction, FilterFunction 等。
-
import org.apache.spark.api.java.function.*;
-
-//导入 org.apache.spark.sql.DataFrame 类。这个类是用于操作结构化数据的。
-
import org.apache.spark.sql.DataFrame;
-
-//导入 org.apache.spark.sql.Row 类。这个类是用于表示一行数据的。
-
import org.apache.spark.sql.Row;
-
-//导入 org.apache.spark.sql.SQLContext 类。
-// 这个类用于创建 Spark SQL 的上下文环境
-
import org.apache.spark.sql.SQLContext;
-
-//导入 org.apache.spark.sql.hive.HiveContext 类。
-//这个类用于与 Hive 兼容的 Spark SQL 上下文环境
-
import org.apache.spark.sql.hive.HiveContext;
-
-//导入 org.apache.spark.storage.StorageLevel 类。
-// 这个类定义了 Spark 中数据存储的级别。
-
import org.apache.spark.storage.StorageLevel;
-
-//导入 scala.Tuple2 类。这个类用于表示一个二元组(一个包含两个元素的元组)。
-
import scala.Tuple2;
-
import java.util.*;
-/**
- * 用户可以查询的范围包含
- * 1。用户的职业
- * 2。用户的性别
- * 3。用户城市
- * 4。用户年龄
- * 5。获取搜索词
- * 6。获取点击品类
- */
public class UserVisitAnalyze {
- public static void main(String[] args)
- {
-
- //初始化 args 参数数组,用于传递给 main 方法。
- // 这里暂时设置为 {"1"},实际使用时可以根据需要调整。
- args=new String[]{"1"};
- /**
- * 构建spark上下文
- */
- //创建 SparkConf 对象并设置应用程序名称和运行模式(本地模式,使用3个核心)。
- //使用 SparkConf 创建 JavaSparkContext 实例,初始化Spark上下文。
- SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
- JavaSparkContext context=new JavaSparkContext(conf);
-
- //通过 JavaSparkContext 获取 SQLContext,用于执行SQL查询。
- SQLContext sc=getSQLContext(context.sc());
- //生成模拟数据,调用 mock 方法生成模拟数据到Spark环境中。
- mock(context,sc);
-
- //拿到相应的Dao组建
- //通过 DaoFactory 获取 TaskDao 实例,用于数据库操作。
- TaskDao dao= DaoFactory.getTaskDao();
-
- //从外部传入的参数获取任务的id
- //通过 args 参数获取任务ID。
- Long taskId=ParamUtils.getTaskIdFromArgs(args);
-
- //从数据库中查询出相应的task
- //通过任务ID从数据库中查询任务信息,并将任务参数解析为 JSONObject
- Task task=dao.findTaskById(taskId);
- JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
-
- //获取指定范围内的Sesssion
- //调用 getActionRDD 方法获取包含指定范围内的Session的RDD。
- JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject);
-
- //这里增加一个新的方法,主要是映射
- //将 sessionRangeDate 转换为包含键值对的 PairRDD。
- JavaPairRDD sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
-
- //重复用到的RDD进行持久化
- //将 sessionInfoPairRDD 持久化到磁盘,提高后续操作的性能
+ public static void main(String[] args) {
+ args = new String[]{"1"};
+ SparkConf conf = new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
+ JavaSparkContext context = new JavaSparkContext(conf);
+ SQLContext sc = getSQLContext(context.sc());
+ mock(context, sc);
+ TaskDao dao = DaoFactory.getTaskDao();
+ Long taskId = ParamUtils.getTaskIdFromArgs(args);
+ Task task = dao.findTaskById(taskId);
+ JSONObject jsonObject = JSONObject.parseObject(task.getTaskParam());
+ JavaRDD sessionRangeDate = getActionRDD(sc, jsonObject);
+ JavaPairRDD sessionInfoPairRDD = getSessonInfoPairRDD(sessionRangeDate);
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
-
- //按照Sesson进行聚合
- //调用 aggregateBySessionId 方法对Session信息进行聚合。
- JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
-
- //通过条件对RDD进行筛选
- //使用Accumulator进行统计:
- //创建一个 Accumulator 来统计聚合结果。
- Accumulator sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
-
- //过滤和统计Session信息:
- //在进行accumulator之前,需要aciton动作,不然会为空
- //调用 filterSessionAndAggrStat 方法过滤并统计Session信息。
- JavaPairRDD filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
-
- //重复用到的RDD进行持久化
- //将过滤后的RDD持久化到磁盘。
+ JavaPairRDD sesssionAggregateInfoRDD = aggregateBySessionId(sc, sessionInfoPairRDD);
+ Accumulator sessionAggrStatAccumulator = context.accumulator("", new SessionAggrStatAccumulator());
+ JavaPairRDD filteredSessionRDD = filterSessionAndAggrStat(sesssionAggregateInfoRDD, jsonObject, sessionAggrStatAccumulator);
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
-
- //获取符合过滤条件的全信息公共RDD
- //调用 getFilterFullInfoRDD 方法获取包含完整信息的公共RDD。
- JavaPairRDD commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
-
- //重复用到的RDD进行持久化
- //将公共RDD持久化到磁盘。
+ JavaPairRDD commonFullClickInfoRDD = getFilterFullInfoRDD(filteredSessionRDD, sessionInfoPairRDD);
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
- //session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
-
-
-
- /**
- * 重构实现的思路:
- * 1。不要去生成任何的新RDD
- *
- * 2。不要去单独遍历一遍sesion的数据
- *
- * 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
- *
- * 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
- *
- * 开发Spark的经验准则
- *
- * 1。尽量少生成RDD
- *
- * 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
- *
- * 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
- *
- * shuffle操作,会导致大量的磁盘读写,严重降低性能
- * 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
- * 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
- *
- * 4。无论做什么功能,性能第一
- *
- * 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
- * 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
- * 一场灾难。
- */
-
-
-
- /**
- * 使用CountByKey算子实现随机抽取功能
- */
- //调用 randomExtractSession 方法实现随机抽取功能。
- randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
-
- //在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算
- //filteredSessionRDD.count();
- //计算各个session占比,并写入MySQL
-
- //调用 calculateAndPersist 方法计算并持久化聚合统计结果。
- calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
-
- //调用 getTop10Category 方法获取热门品类数据Top10
- List> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
-
- //调用 getTop10Session 方法获取热门品类点击Top10Session。
- getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
-
- //关闭Spark上下文,释放资源
+ randomExtractSession(taskId, filteredSessionRDD, sessionInfoPairRDD);
+ calculateAndPersist(sessionAggrStatAccumulator.value(), taskId);
+ List> top10CategoryIds = getTop10Category(taskId, commonFullClickInfoRDD);
+ getTop10Session(context, taskId, sessionInfoPairRDD, top10CategoryIds);
context.close();
}
-
- /**
- * 功能总结
- * 配置和初始化:配置Spark环境并初始化Spark上下文。
- * 生成模拟数据:生成模拟数据到Spark环境中。
- * 数据库操作:从数据库中获取任务信息。
- * 数据处理:读取和处理Session数据,进行聚合、过滤、统计等操作。
- * 持久化:将中间结果持久化以提高性能。
- * 统计和汇总:计算并汇总统计数据。
- * 随机抽取和输出:实现随机抽取功能并输出结果。
- * 关闭资源:关闭Spark上下文,释放资源。
- * 通过这些步骤,可以完成用户访问分析的整个流程。
- */
-
-
+}
@@ -368,14 +174,14 @@ public class UserVisitAnalyze {
//定义方法签名:
//定义一个静态方法 getSessonInfoPairRDD,该方法接受一个 JavaRDD 类型的参数 sessionRangeDate,
// 并返回一个 JavaPairRDD。
- private static JavaPairRDD getSessonInfoPairRDD(JavaRDD sessionRangeDate) {
+ private static JavaPairRDD getSessonInfoPairRDD(JavaRDD sessionRangeDate) {
//使用 mapToPair 方法:
- //使用 mapToPair 方法将 JavaRDD 转换为 JavaPairRDD。
- // mapToPair 方法接受一个 PairFunction 实现类,
- // 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2。
- return sessionRangeDate.mapToPair(new PairFunction() {
+ //使用 mapToPair 方法将 JavaRDD 转换为 JavaPairRDD。
+ // mapToPair 方法接受一个 PairFunction 实现类,
+ // 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2。
+ return sessionRangeDate.mapToPair(new PairFunction() {
@Override
//实现 PairFunction:
//实现 PairFunction 接口的 call 方法。
@@ -388,16 +194,16 @@ public class UserVisitAnalyze {
return new Tuple2(row.getString(2),row);
}
});
- /**
- * 功能解释
- * 映射操作:使用 mapToPair 方法将每个 Row 对象转换为一个键值对 Tuple2。具体来说,
- * 键是 Row 对象中的第3列的值,值是整个 Row 对象。
- * 生成键值对:通过提取 Row 对象中的特定字段生成键值对,方便后续的聚合和处理操作。
- * getSessonInfoPairRDD 方法:将 JavaRDD 转换为 JavaPairRDD,
- * 其中键为 Row 对象中的 sessionId,
- * 值为整个 Row 对象。
- * 这有助于后续的聚合和处理操作,例如按 sessionId 分组统计或其他复杂操作。
- */
+ /**
+ * 功能解释
+ * 映射操作:使用 mapToPair 方法将每个 Row 对象转换为一个键值对 Tuple2。具体来说,
+ * 键是 Row 对象中的第3列的值,值是整个 Row 对象。
+ * 生成键值对:通过提取 Row 对象中的特定字段生成键值对,方便后续的聚合和处理操作。
+ * getSessonInfoPairRDD 方法:将 JavaRDD 转换为 JavaPairRDD,
+ * 其中键为 Row 对象中的 sessionId,
+ * 值为整个 Row 对象。
+ * 这有助于后续的聚合和处理操作,例如按 sessionId 分组统计或其他复杂操作。
+ */
}
@@ -420,11 +226,11 @@ public class UserVisitAnalyze {
/**
*
代码重构
- JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() {
- @Override
- public Tuple2 call(Row row) throws Exception {
- return new Tuple2(row.getString(2),row);
- }
+ JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getString(2),row);
+ }
});*/
/**
* 根据sessionId进行分组
@@ -497,7 +303,7 @@ public class UserVisitAnalyze {
{
endTime=actionTime;
}
- stepLength++;
+ stepLength++;
}
@@ -516,22 +322,22 @@ public class UserVisitAnalyze {
});
//查询所有的用户数据
- String sql="select * from user_info";
+ String sql="select * from user_info";
- //执行SQL查询获取用户信息,并将结果转换为 JavaRDD。
- JavaRDD userInfoRDD=sc.sql(sql).javaRDD();
+ //执行SQL查询获取用户信息,并将结果转换为 JavaRDD。
+ JavaRDD userInfoRDD=sc.sql(sql).javaRDD();
- //将用户信息映射成map
+ //将用户信息映射成map
//使用 mapToPair 方法将 JavaRDD 转换为 JavaPairRDD,
// 其中键为 Row 对象中的某个列(通常是 userId),值为整个 Row 对象。
- JavaPairRDD userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction() {
- @Override
- public Tuple2 call(Row row) throws Exception {
- return new Tuple2(row.getLong(0),row);
- }
- });
-
- //将两个信息join在一起
+ JavaPairRDD userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getLong(0),row);
+ }
+ });
+
+ //将两个信息join在一起
//使用 join 方法将两个 JavaPairRDD 进行连接,连接键为 userId。结果是一个 JavaPairRDD>,
// 其中键为 userId,值为一个包含用户会话信息和用户信息的元组。
JavaPairRDD> tuple2JavaPairRDD=sessionPartInfo.join(userInfoPariRDD);
@@ -737,7 +543,7 @@ public class UserVisitAnalyze {
else if(stepLength>30&&stepLength<=60)
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
else if(stepLength>60)
- sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
});
//返回过滤后的会话数据 filteredSessionRDD。
@@ -847,7 +653,7 @@ public class UserVisitAnalyze {
// 从键中分割出小时部分,以"_"作为分隔符取第二个元素。
String hour=entry.getKey().split("_")[1];
- // 根据日期从dateHourCountMap中获取对应的内层Map(存放小时和数量的映射),
+ // 根据日期从dateHourCountMap中获取对应的内层Map(存放小时和数量的映射),
// 如果不存在则返回null。
Map hourCount=dateHourCountMap.get(date);
@@ -1435,14 +1241,14 @@ public class UserVisitAnalyze {
- /**
- * 将几个品类相连接
- * @param categoryRDD
- * @param clickCategoryRDD
- * @param orderCategoryRDD
- * @param payCategoryRDD
- * @return
- */
+ /**
+ * 将几个品类相连接
+ * @param categoryRDD
+ * @param clickCategoryRDD
+ * @param orderCategoryRDD
+ * @param payCategoryRDD
+ * @return
+ */
private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) {
// 此方法名为joinCategoryAndData,作用是将不同来源的品类相关数据(品类ID以及对应的点击、下单、支付次数等信息)进行关联整合,
// 接收四个JavaPairRDD类型的参数,分别是categoryRDD(包含品类ID信息)、clickCategoryRDD(点击品类次数相关信息)、
@@ -1579,7 +1385,7 @@ public class UserVisitAnalyze {
- //后去支付品类RDD
+ //后去支付品类RDD
private static JavaPairRDD getPayCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
// 此方法名为getPayCategoryRDD,其目的是从给定的JavaPairRDD类型的sessionId2DetailRDD中提取出与支付品类相关的信息,
// 经过一系列处理后,最终返回一个JavaPairRDD类型的结果,其中键和值大概率都与支付品类相关(比如品类ID以及对应的支付次数等,从后续代码逻辑推测)。
@@ -1671,7 +1477,7 @@ public class UserVisitAnalyze {
- //获取下单品类RDD
+ //获取下单品类RDD
private static JavaPairRDD getOrderCategoryRDD(JavaPairRDD sessionId2DetailRDD) {
// 此方法名为getOrderCategoryRDD,其功能是从给定的JavaPairRDD类型的sessionId2DetailRDD中提取出与下单品类相关的信息,
// 通过一系列的数据处理操作,最终返回一个JavaPairRDD类型的结果,这个结果大概率包含了下单品类ID以及对应的下单次数等相关数据(从代码逻辑推测)。
@@ -1835,7 +1641,7 @@ public class UserVisitAnalyze {
- //获取每一个品类的Session Top10
+ //获取每一个品类的Session Top10
private static void getTop10Session(JavaSparkContext sc, final Long taskId, JavaPairRDD sessionInfoPairRDD, List> top10CategoryIds)
{
// 此方法名为`getTop10Session`,作用是基于给定的Spark上下文(`JavaSparkContext`)、任务ID(`taskId`)、
@@ -1855,17 +1661,25 @@ public class UserVisitAnalyze {
// 并赋值给`countInfo`变量,后续将从这个字符串里按照特定的分隔规则提取出品类ID相关内容。
Long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
+ // 借助`StringUtils`工具类(应该是自定义的用于字符串处理的工具类)的`getFieldFromConcatString`方法,
+ // 按照“|”作为分隔符,从`countInfo`字符串中提取出对应`Constants.FIELD_CATEGORY_ID`(推测是在`Constants`类中定义好的常量,用于明确品类ID在字符串中的位置等信息)
+ // 这个字段对应的内容,然后将其转换为`Long`类型,赋值给`categoryId`变量,这样就获取到了当前品类的ID信息。
categoryIdList.add(new Tuple2(categoryId, categoryId));
-
+ // 将获取到的品类ID信息封装成一个键值对(这里键和值都设置为该品类ID),然后添加到`categoryIdList`列表中,
+ // 这么做可能是为了后续方便统一数据格式,便于生成RDD以及基于品类ID与其他数据做关联操作等。
}
// 生成一份RDD
JavaPairRDD top10CategoryIdsRDD = sc.parallelizePairs(categoryIdList);
+ // 使用`JavaSparkContext`(`sc`)的`parallelizePairs`方法,把包含品类ID信息的`categoryIdList`列表转换为一个`JavaPairRDD`类型的RDD(即`top10CategoryIdsRDD`),
+ // 该RDD以键值对形式存储了排名前10的品类的ID信息(键和值都是品类ID),方便后续在Spark环境下基于品类ID与其他RDD进行各种操作,例如进行连接(`join`)等操作。
// 按照SessionId进行分组
JavaPairRDD> gourpBySessionIdRDD = sessionInfoPairRDD.groupByKey();
-
+ // 调用`sessionInfoPairRDD`的`groupByKey`方法,对这个RDD按照其键(也就是`SessionId`)进行分组操作,
+ // 分组后的结果是一个新的`JavaPairRDD>`类型的`gourpBySessionIdRDD`,其键为`SessionId`,值是一个可迭代的`Row`类型的集合,
+ // 这意味着每个`SessionId`对应的所有会话详细信息(以`Row`类型表示,通常每行数据包含多个字段)会被放在一起,方便后续针对每个会话做进一步的相关统计分析等操作。
// 计算每一个session对品类的点击次数
JavaPairRDD categorySessionCountRDD = gourpBySessionIdRDD.flatMapToPair(new PairFlatMapFunction>, Long, String>() {
@@ -1875,17 +1689,29 @@ public class UserVisitAnalyze {
@Override
public Iterable> call(Tuple2> tuple2) throws Exception {
+ // 重写了`PairFlatMapFunction`接口中的`call`方法,当执行`flatMapToPair`操作时,针对`gourpBySessionIdRDD`中的每一个元素(每个`Tuple2>`类型的键值对)都会调用这个`call`方法来执行具体的转换逻辑,
+ // 参数`tuple2`就是当前正在处理的那个键值对元素,其类型符合接口定义中的`Tuple2>`。
String sessionId = tuple2._1;
+ // 从传入的键值对`tuple2`中获取其键部分(也就是当前会话的`SessionId`),并赋值给`sessionId`变量,
+ // 后续的所有操作都会基于这个会话ID来统计该会话对不同品类的点击次数等信息,确保数据处理是围绕着同一个会话进行的。
// 保存每一个品类的单击次数
Map categoryIdCount = new HashMap();
+ // 创建一个名为`categoryIdCount`的`HashMap`,用于存储每个品类ID(以`Long`类型作为键)对应的点击次数(以`Long`类型作为值),
+ // 这个`Map`会在遍历当前会话的详细信息时,用来记录和更新各个品类的点击次数情况。
for (Row row : tuple2._2) {
+ // 开始遍历`tuple2`的值部分(是一个可迭代的`Row`类型的集合,代表当前`SessionId`对应的所有会话详细信息行),
+ // 通过遍历这些行数据,可以获取每行中的相关字段信息,以此来判断是否有品类点击行为,并相应地更新对应品类的点击次数。
if (row.get(6) != null) {
+ // 判断当前行数据(`Row`类型的`row`)中索引为6的字段(从代码上下文推测这个字段代表点击品类的ID,不过具体还是要结合数据结构定义来确定)是否为空,
+ // 如果不为空,说明存在点击品类相关的信息,那就需要对该品类的点击次数进行处理了。
Long count = categoryIdCount.get(row.getLong(6));
+ // 从`categoryIdCount`这个`Map`中获取当前点击品类的ID(通过`row.getLong(6)`获取点击品类的ID,并以此作为键)对应的点击次数,
+ // 如果该品类ID之前已经存在于`Map`中,那么就能获取到对应的点击次数值(可能初始为0或者是之前累计的次数),要是不存在则会返回`null`。
if (count == null) {
count = 0L;
@@ -1897,26 +1723,26 @@ public class UserVisitAnalyze {
categoryIdCount.put(row.getLong(6), count);
// 把更新后的点击次数(`count`变量的值)重新放回`categoryIdCount`这个`Map`中,以当前点击品类的ID(`row.getLong(6)`)作为键,
-
+ // 这样就完成了对该品类点击次数的更新操作,保证`Map`里始终记录着每个品类在当前会话中的最新点击次数情况。
}
}
List> categoryIdCountList = new ArrayList>();
// 创建一个名为`categoryIdCountList`的`ArrayList`列表,用于存放即将生成的包含品类ID以及对应点击次数信息的键值对,
-
+ // 其键为品类ID(`Long`类型),值是一个包含会话ID和点击次数的字符串(`String`类型),方便后续整合数据以及转换为RDD进行处理。
for (Map.Entry entry : categoryIdCount.entrySet()) {
// 开始遍历`categoryIdCount`这个`Map`中的所有键值对(每个键值对代表一个品类ID及其对应的点击次数),
-
+ // 通过遍历这些键值对,可以依次提取每个品类的相关信息,进而构建成新的键值对添加到`categoryIdCountList`列表中。
// 将数据拼接
String value = sessionId + "," + entry.getValue();
// 根据一定的格式规则构建一个字符串`value`,把当前会话的`ID`(`sessionId`)和当前品类的点击次数(通过`entry.getValue()`获取到的`Long`类型的点击次数,转换为字符串后)
-
+ // 用逗号作为分隔符连接起来,形成一个包含会话ID和点击次数信息的字符串,方便后续继续整合其他信息以及做持久化等操作使用。
categoryIdCountList.add(new Tuple2(entry.getKey(), value));
// 将品类ID(通过`entry.getKey()`获取到的`Long`类型的品类ID)和构建好的包含会话ID与点击次数信息的字符串(`value`)封装成一个新的键值对,
-
+ // 然后添加到`categoryIdCountList`列表中,这样就把每个品类在当前会话中的点击次数信息整理成了便于后续处理的键值对形式。
}
return categoryIdCountList;
// 返回包含了所有品类在当前会话中的点击次数信息(以键值对形式存在,键为品类ID,值为包含会话ID和点击次数的字符串)的`categoryIdCountList`列表,
@@ -1934,22 +1760,25 @@ public class UserVisitAnalyze {
@Override
public Tuple2 call(Tuple2> tuple2) throws Exception {
// 重写了PairFunction接口中的call方法,当执行mapToPair操作时,会针对连接后的结果数据中的每一个元素(每个Tuple2>类型的键值对)调用这个call方法,
-
+ // 参数tuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2>。
return new Tuple2(tuple2._1, tuple2._2._2);
// 从传入的键值对tuple2中提取其键部分(也就是品类ID,tuple2._1)以及值部分中包含点击次数信息的字符串(tuple2._2._2),
-
+ // 创建一个新的键值对,键为品类ID,值为包含点击次数信息的字符串,这样就得到了每一个热门品类在各个会话中的点击次数相关信息的键值对形式,
+ // 这些新的键值对将构成新的JavaPairRDD(即前面定义的top10CategorySessionCountRDD),用于后续基于热门品类的分组等操作。
}
});
// 根据品类分组
JavaPairRDD> top10CategorySessionCountGroupRDD = top10CategorySessionCountRDD.groupByKey();
// 调用top10CategorySessionCountRDD的groupByKey方法,对其按照键(也就是品类ID)进行分组操作,
-
+// 分组后的结果是一个新的JavaPairRDD>类型的top10CategorySessionCountGroupRDD,其键为品类ID,值是一个可迭代的字符串集合,
+// 每个集合中的字符串包含了对应品类在不同会话中的点击次数等相关信息,方便后续针对每个品类汇总分析其在各个会话中的情况,例如找出每个品类对应的点击次数较多的会话等操作。
JavaPairRDD top10CategorySessionRDD = top10CategorySessionCountGroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() {
// 调用top10CategorySessionCountGroupRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
-
+ // 此操作的目的是将分组后的top10CategorySessionCountGroupRDD中的元素(类型为Tuple2>)按照自定义的逻辑转换为零个、一个或多个新的键值对(输出类型是Tuple2),
+ // 在这里主要是从每个品类对应的会话点击次数信息中提取出排名前10的会话信息,并整理成相应的键值对形式,便于后续持久化到数据库等操作。
@Override
public Iterable> call(Tuple2> tuple2) throws Exception {
@@ -1971,18 +1800,20 @@ public class UserVisitAnalyze {
List> sessionIdList = new ArrayList>();
// 创建一个名为sessionIdList的ArrayList列表,用于存放即将生成的包含会话ID的键值对,其键和值都为会话ID(String类型),
-
+ // 后续会将排名前10的会话ID信息整理成这样的键值对添加到这个列表中,用于返回给外部或者进行其他相关操作(比如和其他数据关联等)。
for (String sessionCount : tuple2._2) {
// 开始遍历tuple2的值部分(是一个可迭代的字符串集合,每个字符串包含了对应品类在某个会话中的点击次数等相关信息),通过遍历可以依次获取每个会话的相关信息,
+ // 用于判断是否能进入排名前10的会话列表以及更新相应的排名情况。
String[] sessionCountSplited = sessionCount.split(",");
// 将当前遍历到的包含会话信息的字符串(sessionCount)按照逗号作为分隔符进行拆分,得到一个字符串数组sessionCountSplited,
-
+ // 数组中第一个元素(索引为0)应该是会话ID,第二个元素(索引为1)应该是点击次数(从前面构建字符串的逻辑推测),方便后续分别提取使用。
// String sessionId = sessionCountSplited[0];
Long count = Long.valueOf(sessionCountSplited[1]);
// 从拆分后的字符串数组sessionCountSplited中获取第二个元素(索引为1),也就是点击次数信息,并将其转换为Long类型,赋值给count变量,
+ // 用于后续和已有的排名前10的会话信息进行比较,判断是否能进入前10的列表。
// 获取TopN
for (int i = 0; i < top10Sessions.length; i++) {
@@ -1999,7 +1830,10 @@ public class UserVisitAnalyze {
}
}
}
-
+ // 上述循环用于将当前会话的点击次数信息(sessionCount)与已有的top10Sessions数组中的会话信息进行比较,
+ // 如果top10Sessions数组中某个位置为空(即还未满10个会话信息),则直接将当前会话信息放入该位置;
+ // 如果该位置已有会话信息,则比较它们的点击次数(通过解析字符串获取),如果当前会话的点击次数大于已有位置的点击次数,
+ // 则将已有位置及之后的会话信息依次往后移动一位(腾出当前位置),然后将当前会话信息放入该位置,以此来动态维护每个品类的前10个点击次数最多的会话信息列表。
}
// 封装数据
@@ -2013,19 +1847,33 @@ public class UserVisitAnalyze {
sessionIdList.add(new Tuple2(sessionId, sessionId));
}
}
+ // 对于top10Sessions数组中不为空的每个位置(也就是每个品类对应的排名前10的会话信息),进行如下操作:
+ // 首先创建一个Top10CategorySession类型的对象top10CategorySession,然后从对应的会话信息字符串(top10Sessions[i])中解析出会话ID(通过split方法获取第一个元素)
+ // 和点击次数(通过split方法获取第二个元素并转换为Long类型),接着调用top10CategorySession对象的set方法(假设该类有这样的方法用于设置属性值),
+ // 将外部传入的任务ID(taskId)、当前品类ID(categoryId)、解析出的会话ID(sessionId)以及点击次数(count)设置到对象中,
+ // 将封装好数据的top10CategorySession对象添加到top10CategorySessionList列表中,同时将会话ID封装成键值对(键和值都为会话ID)添加到sessionIdList列表中,
+ // 这样top10CategorySessionList就包含了所有需要持久化到数据库的排名前10的品类会话详细信息对象,而sessionIdList则包含了对应的会话ID信息键值对。
// 批量插入数据库
DaoFactory.getTop10CategorySessionDao().batchInsert(top10CategorySessionList);
+ // 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getTop10CategorySessionDao方法,
+ // 获取到用于操作Top10CategorySession数据的DAO对象,然后调用其batchInsert方法,
+ // 将包含要持久化数据的top10CategorySessionList列表传入,实现将排名前10的品类会话详细信息批量插入到数据库中的功能,完成数据持久化的操作流程。
return sessionIdList;
-
+ // 返回包含会话ID信息键值对的sessionIdList列表,虽然这里代码逻辑上返回了这个列表,但从整体功能来看,重点在于前面已经完成了将排名前10的品类会话详细信息插入数据库的操作,
+ // 这个返回值可能在后续代码(如果有的话)中用于其他关联操作或者只是作为一种函数执行结果的返回形式存在,具体要结合整体的代码上下文来进一步确定其用途。
}
});
//3. 获取session的明细数据保存到数据库
JavaPairRDD> sessionDetailRDD = top10CategorySessionRDD.join(sessionInfoPairRDD);
-
+// 对top10CategorySessionRDD(前面经过一系列处理得到的与排名前10的品类会话相关的RDD,其键值对类型为,从前面代码逻辑推测键可能是会话相关信息,值是对应信息字符串)
+// 和sessionInfoPairRDD(包含完整会话信息的RDD,键值对类型为,键是会话ID,值是包含多个字段的Row类型的会话详细数据行)进行连接(join)操作。
+// 连接的目的是将排名前10的品类会话相关信息与完整的会话详细数据进行整合,生成一个新的JavaPairRDD>类型的sessionDetailRDD,
+// 其键为某个用于关联的标识(从前面逻辑推测可能与会话相关),值是一个包含两个元素的Tuple2,第一个元素是字符串(可能是前面top10品类会话相关的部分信息),第二个元素是Row类型的完整会话详细数据,
+// 方便后续基于整合后的数据提取出需要持久化到数据库的详细会话信息。
sessionDetailRDD.foreachPartition(new VoidFunction>>>() {
// 调用sessionDetailRDD的foreachPartition方法,传入一个实现了VoidFunction接口的匿名内部类实例,
@@ -2035,81 +1883,89 @@ public class UserVisitAnalyze {
@Override
public void call(Iterator>> tuple2Iterator) throws Exception {
// 重写了VoidFunction接口中的call方法,当foreachPartition方法遍历每个分区时,会针对每个分区对应的迭代器(Iterator>>类型,其中元素是包含会话相关信息的复杂键值对)调用这个call方法,
-
+ // 参数tuple2Iterator就是当前正在处理的那个分区对应的迭代器,通过它可以遍历该分区内的所有元素(每个元素是Tuple2>类型的键值对)进行具体的数据提取和处理操作。
List sessionDetailList = new ArrayList();
// 创建一个名为sessionDetailList的ArrayList列表,用于存放SessionDetail类型的对象,
-
+ // SessionDetail应该是自定义的用于封装要持久化到数据库的详细会话信息的实体类,后续会将从分区数据中提取出来的各个会话的详细信息封装成此类对象添加到这个列表中,
+ // 以便进行批量插入数据库的操作。
while (tuple2Iterator.hasNext()) {
// 通过调用迭代器(tuple2Iterator)的hasNext方法判断当前分区内是否还有未处理的元素(即Tuple2>类型的键值对),如果有则进入循环进行处理。
Tuple2> tuple2 = tuple2Iterator.next();
// 调用迭代器(tuple2Iterator)的next方法获取下一个要处理的元素(键值对),并赋值给tuple2变量,
-
+ // tuple2的类型为Tuple2>,其第一个元素(tuple2._1)可能是会话相关的标识信息,第二个元素(tuple2._2)是一个包含字符串和Row类型数据的Tuple2,
+ // 后续将从这个复杂的结构中提取出具体的会话详细字段信息进行封装。
Row row = tuple2._2._2;
// 从tuple2的值部分(即Tuple2类型的元素)中获取第二个元素(也就是Row类型的数据),赋值给row变量,
-
+ // Row类型通常代表一行包含多个字段的会话详细数据,后续将从这个row中提取出各个具体的字段信息,用于构建SessionDetail对象。
String sessionId = tuple2._1;
// 从tuple2中获取其键部分(也就是前面提到的可能与会话相关的标识信息),赋值给sessionId变量,这里将其作为会话的ID信息,
-
+ // 后续会把这个会话ID设置到SessionDetail对象中,确保数据的完整性和关联性。
Long userId = row.getLong(1);
// 从row数据行中获取索引为1的字段(从代码上下文推测该字段存储的是用户ID信息,不过具体要结合数据结构定义确定),并将其转换为Long类型,赋值给userId变量,
-
+ // 用于后续将用户ID信息封装到SessionDetail对象中,记录该会话对应的用户情况。
Long pageId = row.getLong(3);
// 从row数据行中获取索引为3的字段(推测该字段存储的是页面ID等相关信息,具体依据数据结构定义),转换为Long类型后赋值给pageId变量,
-
+ // 以便将页面相关信息添加到SessionDetail对象中,完善会话详细信息的记录。
String actionTime = row.getString(4);
// 从row数据行中获取索引为4的字段(可能是会话操作时间等相关信息,依据数据结构而定),赋值给actionTime变量,
-
+ // 后续会把这个时间信息设置到SessionDetail对象中,用于记录会话发生的时间情况。
String searchKeyWard = row.getString(5);
// 从row数据行中获取索引为5的字段(可能是搜索关键词等相关信息,根据实际数据结构来确定),赋值给searchKeyWard变量,
+ // 用于将搜索相关情况添加到SessionDetail对象中,更全面地记录会话的详细内容。
Long clickCategoryId = row.getLong(6);
// 从row数据行中获取索引为6的字段(推测是点击品类的ID信息,结合前面代码逻辑判断),转换为Long类型后赋值给clickCategoryId变量,
-
+ // 这样就能把点击品类相关信息纳入到SessionDetail对象中,记录会话中涉及的品类点击情况。
Long clickProducetId = row.getLong(7);
// 从row数据行中获取索引为7的字段(可能是点击产品的ID信息,根据数据结构定义),赋值给clickProducetId变量,
-
+ // 以便在SessionDetail对象中记录点击产品的相关情况,丰富会话详细信息的内容。
String orderCategoryId = row.getString(8);
// 从row数据行中获取索引为8的字段(推测是下单品类的ID相关信息,从整体业务逻辑推测),赋值给orderCategoryId变量,
-
+ // 后续会将下单品类信息添加到SessionDetail对象中,用于记录会话中涉及的下单品类情况。
String orderProducetId = row.getString(9);
// 从row数据行中获取索引为9的字段(可能是下单产品的ID相关信息,依据数据结构确定),赋值给orderProducetId变量,
-
+ // 以便把下单产品相关情况设置到SessionDetail对象中,更完整地记录会话中下单相关的详细信息。
String payCategoryId = row.getString(10);
// 从row数据行中获取索引为10的字段(推测是支付品类的ID相关信息,结合业务逻辑判断),赋值给payCategoryId变量,
-
+ // 用于将支付品类信息纳入到SessionDetail对象中,记录会话中涉及的支付品类情况。
String payProducetId = row.getString(11);
// 从row数据行中获取索引为11的字段(可能是支付产品的ID相关信息,根据数据结构定义),赋值给payProducetId变量,
-
+ // 这样就能在SessionDetail对象中记录支付产品的相关情况,完善会话详细信息的记录,使其包含会话涉及的各种业务行为相关的详细信息。
SessionDetail sessionDetail = new SessionDetail();
// 创建一个SessionDetail类型的对象sessionDetail,此类是用于封装完整会话详细信息的自定义实体类,接下来将把前面提取到的各个字段信息设置到这个对象中。
sessionDetail.set(taskId, userId, sessionId, pageId, actionTime, searchKeyWard, clickCategoryId, clickProducetId, orderCategoryId, orderProducetId, payCategoryId, payProducetId);
// 调用sessionDetail对象的set方法(假设SessionDetail类有此方法用于设置对象的各个属性值),
+ // 将外部传入的任务ID(taskId)以及前面提取出来的用户ID、会话ID、页面ID、操作时间、搜索关键词、点击品类ID、点击产品ID、下单品类ID、下单产品ID、支付品类ID、支付产品ID等信息设置到sessionDetail对象中,
+ // 完成对该对象的属性赋值操作,使其封装好了当前会话的完整详细信息,准备添加到列表中进行批量插入数据库操作。
sessionDetailList.add(sessionDetail);
// 将封装好数据的sessionDetail对象添加到sessionDetailList列表中,通过循环不断添加,最终这个列表将包含当前分区内所有会话的完整详细信息对象,
-
+ // 待整个分区的数据都处理完后,就可以进行批量插入数据库的操作了。
}
DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
// 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getSessionDetailDao方法,
-
+ // 获取到用于操作SessionDetail数据的DAO对象,然后调用其batchInsert方法,
+ // 将包含要持久化数据的sessionDetailList列表传入,实现将当前分区内所有会话的详细信息批量插入到数据库中的功能,
+ // 完成对每个分区数据的持久化操作,整个流程遍历完所有分区后,就将所有相关会话的详细信息都保存到数据库中了。
}
});
}
- }
\ No newline at end of file
+ }
+