diff --git a/.idea/misc.xml b/.idea/misc.xml
index 344f653..a48fac4 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -6,6 +6,7 @@
+
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index f042c63..6157b54 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -1,19 +1,18 @@
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
@@ -69,6 +68,11 @@
+
@@ -136,27 +140,22 @@
true
DEFINITION_ORDER
-
-
-
-
-
-
-
+
+
+ {
+ "associatedIndex": 8
+}
+
@@ -278,17 +277,27 @@
-
-
-
-
-
-
-
-
-
-
+
+
+
+ {
+ "keyToString": {
+ "JUnit.ParamUtilsTest.test2.executor": "Run",
+ "RunOnceActivity.OpenProjectViewOnStart": "true",
+ "RunOnceActivity.ShowReadmeOnStart": "true",
+ "git-widget-placeholder": "zxr",
+ "kotlin-language-version-configured": "true",
+ "last_opened_file_path": "D:/git/project/UserActionAnalyzePlatform",
+ "node.js.detected.package.eslint": "true",
+ "node.js.detected.package.tslint": "true",
+ "node.js.selected.package.eslint": "(autodetect)",
+ "node.js.selected.package.tslint": "(autodetect)",
+ "nodejs_package_manager_path": "npm",
+ "settings.editor.selected.configurable": "reference.settings.project.maven.repository.indices",
+ "vue.rearranger.settings.migration": "true"
+ }
+}
@@ -299,19 +308,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
+
@@ -319,219 +316,140 @@
+
+
+
+
+
+
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
-
-
-
+
+
+
+
+
-
-
+
+
+
+
+
+
+
@@ -551,6 +469,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
1529592741848
@@ -566,7 +497,175 @@
1529593229553
-
+
+
+ 1734351693404
+
+
+
+ 1734351693404
+
+
+
+ 1734352975106
+
+
+
+ 1734352975106
+
+
+
+ 1734353193141
+
+
+
+ 1734353193141
+
+
+
+ 1734353269426
+
+
+
+ 1734353269426
+
+
+
+ 1734354775314
+
+
+
+ 1734354775314
+
+
+
+ 1734354900465
+
+
+
+ 1734354900465
+
+
+
+ 1734354969705
+
+
+
+ 1734354969705
+
+
+
+ 1734357525655
+
+
+
+ 1734357525655
+
+
+
+ 1734358371283
+
+
+
+ 1734358371283
+
+
+
+ 1734360173676
+
+
+
+ 1734360173676
+
+
+
+ 1734362412393
+
+
+
+ 1734362412393
+
+
+
+ 1734363163041
+
+
+
+ 1734363163041
+
+
+
+ 1734363388179
+
+
+
+ 1734363388179
+
+
+
+ 1734365844392
+
+
+
+ 1734365844392
+
+
+
+ 1734368636062
+
+
+
+ 1734368636062
+
+
+
+ 1734369061766
+
+
+
+ 1734369061766
+
+
+
+ 1734369482371
+
+
+
+ 1734369482371
+
+
+
+ 1734370121454
+
+
+
+ 1734370121454
+
+
+
+ 1734370179773
+
+
+
+ 1734370179773
+
+
+
+ 1734370518519
+
+
+
+ 1734370518519
+
+
+
+ 1734370728253
+
+
+
+ 1734370728253
+
+
@@ -669,47 +768,33 @@
-
+
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
diff --git a/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java b/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java
index 77f83a2..ef29ec8 100644
--- a/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java
+++ b/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java
@@ -1,20 +1,25 @@
package cn.edu.hust.session;
+// 包声明,表明该类所属的包名,用于在项目中对类进行组织和分类管理,方便代码模块化以及避免命名冲突,此处在 cn.edu.hust.session 包下定义类。
import cn.edu.hust.constant.Constants;
import cn.edu.hust.util.StringUtils;
import org.apache.spark.AccumulatorParam;
+// 导入相关类。Constants类可能存放如时间区间、步数区间等相关常量;StringUtils类提供字符串操作相关工具方法;AccumulatorParam是Apache Spark中的接口,用于自定义累加器行为,本类将实现它来定制累加逻辑。
public class SessionAggrStatAccumulator implements AccumulatorParam{
+// 定义一个公共类SessionAggrStatAccumulator,它实现了AccumulatorParam接口,并指定泛型类型为String,意味着该累加器操作的数据类型是字符串,要按接口定义的方法实现针对字符串类型数据的累加规则。
+
@Override
public String addAccumulator(String s, String t1) {
return add(s,t1);
}
-
+// 重写AccumulatorParam接口的addAccumulator方法,接收两个字符串参数s和t1,直接调用类中定义的add方法,并返回其结果,作用是按照自定义规则(在add方法中定义)累加两个字符串所代表的数据,在相应累加场景下发挥作用。
@Override
public String addInPlace(String s, String r1) {
return add(s,r1);
}
+// 同样是重写AccumulatorParam接口的方法,接收两个字符串参数s和r1,也是调用add方法并返回其结果,和addAccumulator方法类似,用于按特定逻辑对传入的两个字符串数据进行累加处理,不过是在Spark内部不同的累加操作阶段被调用。
//主要用于数据的初始化,这里主要返回一个值就是所有范围区间得的数量
@Override
@@ -36,6 +41,7 @@ public class SessionAggrStatAccumulator implements AccumulatorParam{
+ Constants.STEP_PERIOD_30_60 + "=0|"
+ Constants.STEP_PERIOD_60 + "=0";
}
+// 重写AccumulatorParam接口的zero方法,用于对累加器进行初始化操作。返回由多个常量(代表不同时间区间、步数区间等统计维度)和对应初始值0拼接而成的字符串,各部分以“|”分隔,意味着初始化时各统计维度的数量都设为0。
private String add(String v1,String v2)
{
@@ -44,8 +50,11 @@ public class SessionAggrStatAccumulator implements AccumulatorParam{
if(value!=null)
{
int newValue=Integer.valueOf(value)+1;
- return StringUtils.setFieldInConcatString(v1,"\\|",v2,String.valueOf(newValue));
+ return StringUtils.setFieldInConcatString(v1,"\\|",v2,String.valueOf(newValue));
}
return v1;
}
-}
+// 私有方法,供addAccumulator和addInPlace等方法调用。先判断传入的第一个字符串v1是否为空,为空则返回v2。然后通过StringUtils工具类的方法尝试从v1(类似zero方法初始化的格式字符串)中获取和v2对应的字段值(按“|”分割查找),若获取到,将其转成整数加1后再用工具类方法更新回v1对应位置并返回更新后的v1;若没获取到则直接返回原来的v1。
+
+// 整体功能解释:
+// 这个类实现了基于Apache Spark的自定义累加器功能,用于统计与会话(Session)相关的聚合统计信息,比如统计不同时间区间(像1s - 3s、4s - 6s等时间范围)以及不同步数区间(如1 - 3步、4 - 6步等范围)内会话的数量情况。zero方法初始化各统计维度计数值为0,addAccumulator和addInPlace方法按照add方法定义的逻辑对相应统计维度的计数值进行累加(每出现符合对应区间的会话相关数据,对应维度计数加1),方便后续在Spark任务执行中对会话多维度聚合统计信息进行收集和汇总。
\ No newline at end of file
diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
index ae867a7..1e3af45 100644
--- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
+++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
@@ -1,25 +1,94 @@
+//这行代码定义了当前类所在的包名。包名用于组织和管理类。
package cn.edu.hust.session;
+//导入 cn.edu.hust.conf 包下的 ConfigurationManager 类。这个类可能用于管理配置信息
+
import cn.edu.hust.conf.ConfigurationManager;
+
+//导入 cn.edu.hust.constant 包下的 Constants 类。这个类可能包含一些常量定义。
+
import cn.edu.hust.constant.Constants;
+
+//导入 cn.edu.hust.dao 包下的 TaskDao 类。这个类可能用于与任务相关的数据访问操作。
+
import cn.edu.hust.dao.TaskDao;
+
+//导入 cn.edu.hust.dao.factory 包下的 DaoFactory 类。这个类可能用于创建 DAO 对象。
+
import cn.edu.hust.dao.factory.DaoFactory;
+
+//导入 cn.edu.hust.domain 包下的所有类。这个包可能包含一些数据模型类。
+
import cn.edu.hust.domain.*;
+
+//导入 cn.edu.hust.mockData 包下的 MockData 类。这个类可能用于生成模拟数据
+
import cn.edu.hust.mockData.MockData;
+
+//导入 cn.edu.hust.util 包下的所有类。这个包可能包含一些通用工具类。
+
import cn.edu.hust.util.*;
+
+//导入 com.alibaba.fastjson.JSONObject 类。FastJSON 是一个用于处理 JSON 数据的库。
+
import com.alibaba.fastjson.JSONObject;
+
+//导入 org.apache.spark.Accumulator 类。这个类用于在 Spark 作业中共享和累加数据。
+
import org.apache.spark.Accumulator;
+
+//导入 org.apache.spark.SparkConf 类。这个类用于配置 Spark 应用程序。
+
import org.apache.spark.SparkConf;
+
+//导入 org.apache.spark.SparkContext 类。这个类是 Spark 应用程序的主入口点。
+
import org.apache.spark.SparkContext;
+
+//导入 org.apache.spark.api.java.JavaPairRDD 类。
+// 这个类是用于处理键值对的 RDD(弹性分布式数据集)
+
import org.apache.spark.api.java.JavaPairRDD;
+
+//导入 org.apache.spark.api.java.JavaRDD 类。这个类是用于处理普通 RDD 的 Java API。
+
import org.apache.spark.api.java.JavaRDD;
+
+//导入了 JavaSparkContext 类。
+// JavaSparkContext 是 org.apache.spark.api.java 包下的一个类,提供了与 Spark 集群交互的 Java API。
+
import org.apache.spark.api.java.JavaSparkContext;
+
+//导入 org.apache.spark.api.java.function 包下的所有函数类。
+// 这些类提供了常见的操作函数,如 FlatMapFunction, MapFunction, FilterFunction 等。
+
import org.apache.spark.api.java.function.*;
+
+//导入 org.apache.spark.sql.DataFrame 类。这个类是用于操作结构化数据的。
+
import org.apache.spark.sql.DataFrame;
+
+//导入 org.apache.spark.sql.Row 类。这个类是用于表示一行数据的。
+
import org.apache.spark.sql.Row;
+
+//导入 org.apache.spark.sql.SQLContext 类。
+// 这个类用于创建 Spark SQL 的上下文环境
+
import org.apache.spark.sql.SQLContext;
+
+//导入 org.apache.spark.sql.hive.HiveContext 类。
+//这个类用于与 Hive 兼容的 Spark SQL 上下文环境
+
import org.apache.spark.sql.hive.HiveContext;
+
+//导入 org.apache.spark.storage.StorageLevel 类。
+// 这个类定义了 Spark 中数据存储的级别。
+
import org.apache.spark.storage.StorageLevel;
+
+//导入 scala.Tuple2 类。这个类用于表示一个二元组(一个包含两个元素的元组)。
+
import scala.Tuple2;
import java.util.*;
@@ -36,86 +105,148 @@ import java.util.*;
public class UserVisitAnalyze {
public static void main(String[] args)
{
+
+ //初始化 args 参数数组,用于传递给 main 方法。
+ // 这里暂时设置为 {"1"},实际使用时可以根据需要调整。
args=new String[]{"1"};
/**
* 构建spark上下文
*/
+ //创建 SparkConf 对象并设置应用程序名称和运行模式(本地模式,使用3个核心)。
+ //使用 SparkConf 创建 JavaSparkContext 实例,初始化Spark上下文。
SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
JavaSparkContext context=new JavaSparkContext(conf);
+
+ //通过 JavaSparkContext 获取 SQLContext,用于执行SQL查询。
SQLContext sc=getSQLContext(context.sc());
- //生成模拟数据
+ //生成模拟数据,调用 mock 方法生成模拟数据到Spark环境中。
mock(context,sc);
//拿到相应的Dao组建
+ //通过 DaoFactory 获取 TaskDao 实例,用于数据库操作。
TaskDao dao= DaoFactory.getTaskDao();
+
//从外部传入的参数获取任务的id
+ //通过 args 参数获取任务ID。
Long taskId=ParamUtils.getTaskIdFromArgs(args);
+
//从数据库中查询出相应的task
+ //通过任务ID从数据库中查询任务信息,并将任务参数解析为 JSONObject
Task task=dao.findTaskById(taskId);
JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
//获取指定范围内的Sesssion
+ //调用 getActionRDD 方法获取包含指定范围内的Session的RDD。
JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject);
//这里增加一个新的方法,主要是映射
+ //将 sessionRangeDate 转换为包含键值对的 PairRDD。
JavaPairRDD sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
+
//重复用到的RDD进行持久化
+ //将 sessionInfoPairRDD 持久化到磁盘,提高后续操作的性能
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
- //上面的两个RDD是
+
//按照Sesson进行聚合
+ //调用 aggregateBySessionId 方法对Session信息进行聚合。
JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
//通过条件对RDD进行筛选
- // 重构,同时统计
+ //使用Accumulator进行统计:
+ //创建一个 Accumulator 来统计聚合结果。
Accumulator sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
-
+ //过滤和统计Session信息:
//在进行accumulator之前,需要aciton动作,不然会为空
+ //调用 filterSessionAndAggrStat 方法过滤并统计Session信息。
JavaPairRDD filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
+
//重复用到的RDD进行持久化
+ //将过滤后的RDD持久化到磁盘。
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
+
//获取符合过滤条件的全信息公共RDD
+ //调用 getFilterFullInfoRDD 方法获取包含完整信息的公共RDD。
JavaPairRDD commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
//重复用到的RDD进行持久化
+ //将公共RDD持久化到磁盘。
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
//session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
+
+
+
/**
* 重构实现的思路:
* 1。不要去生成任何的新RDD
+ *
* 2。不要去单独遍历一遍sesion的数据
+ *
* 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
+ *
* 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
+ *
* 开发Spark的经验准则
+ *
* 1。尽量少生成RDD
+ *
* 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
+ *
* 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
+ *
* shuffle操作,会导致大量的磁盘读写,严重降低性能
* 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
* 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
+ *
* 4。无论做什么功能,性能第一
+ *
* 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
* 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
* 一场灾难。
*/
+
+
/**
* 使用CountByKey算子实现随机抽取功能
*/
+ //调用 randomExtractSession 方法实现随机抽取功能。
randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
//在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算
//filteredSessionRDD.count();
//计算各个session占比,并写入MySQL
+
+ //调用 calculateAndPersist 方法计算并持久化聚合统计结果。
calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
- //获取热门品类数据Top10
+
+ //调用 getTop10Category 方法获取热门品类数据Top10
List> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
- //获取热门每一个品类点击Top10session
+
+ //调用 getTop10Session 方法获取热门品类点击Top10Session。
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
- //关闭spark上下文
+
+ //关闭Spark上下文,释放资源
context.close();
}
+ /**
+ * 功能总结
+ * 配置和初始化:配置Spark环境并初始化Spark上下文。
+ * 生成模拟数据:生成模拟数据到Spark环境中。
+ * 数据库操作:从数据库中获取任务信息。
+ * 数据处理:读取和处理Session数据,进行聚合、过滤、统计等操作。
+ * 持久化:将中间结果持久化以提高性能。
+ * 统计和汇总:计算并汇总统计数据。
+ * 随机抽取和输出:实现随机抽取功能并输出结果。
+ * 关闭资源:关闭Spark上下文,释放资源。
+ * 通过这些步骤,可以完成用户访问分析的整个流程。
+ */
+
+
+
+
+
@@ -124,25 +255,67 @@ public class UserVisitAnalyze {
* @param sc
* @return
*/
+ //getSQLContext 方法
public static SQLContext getSQLContext(SparkContext sc)
{
+ //通过 ConfigurationManager 获取配置项 SPARK_LOCAL,判断是否运行在本地模式下。
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
+
+ //判断是否为本地模式:
+ //如果是本地模式,则返回一个新的 SQLContext 实例。
+ // SQLContext 用于执行传统的SQL查询。
if(local)
{
return new SQLContext(sc);
}
+
+ //返回HiveContext:
+ //如果不是本地模式,则返回一个新的 HiveContext 实例。
+ // HiveContext 用于与Hive集成,执行Hive查询。
return new HiveContext(sc);
}
+ /**
+ * 功能解释
+ * 本地模式与Hive模式:此方法根据配置项 SPARK_LOCAL 的值来决定返回 SQLContext 还是 HiveContext。SPARK_LOCAL 为 true 时,
+ * 返回 SQLContext,用于执行传统的SQL查询;
+ * SPARK_LOCAL 为 false 时,返回 HiveContext,用于与Hive集成,执行Hive查询。
+ * 配置管理:使用 ConfigurationManager 来获取配置项,
+ * 确保配置项的灵活性和可维护性。
+ * ConfigurationManager.getBoolean 方法可以方便地获取布尔类型的配置项。
+ * @param context
+ * @param sc
+ */
+
+
+ //mock 方法
private static void mock(JavaSparkContext context,SQLContext sc)
{
+
+ //获取配置项:
+ //通过 ConfigurationManager
+ // 获取配置项 SPARK_LOCAL,判断是否运行在本地模式下。
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
+
+ //判断是否为本地模式:
+ //如果是本地模式,则调用 MockData.mock 方法生成模拟数据。
if(local)
{
MockData.mock(context,sc);
}
}
+ /**
+ * 功能解释
+ * 模拟数据生成:此方法根据配置项 SPARK_LOCAL 的值来决定是否生成模拟数据。
+ * SPARK_LOCAL 为 true 时,调用 MockData.mock 方法生成模拟数据到Spark环境中。
+ * 模拟数据:MockData.mock 方法的具体实现细节没有显示,
+ * 但通常用于生成模拟的RDD或DataFrame,以便在开发或测试阶段使用。
+ * 这有助于避免实际数据处理过程中可能出现的问题,同时便于进行单元测试和调试。
+ */
+
+
+
/**
* 获取指定日期范围内的数据
@@ -150,27 +323,82 @@ public class UserVisitAnalyze {
* @param taskParam
* @return
*/
+
+
+ //getActionRDD 方法
private static JavaRDD getActionRDD(SQLContext sc, JSONObject taskParam)
{
+
+ //获取开始时间和结束时间:
+ //从 taskParam 中获取开始时间 startTime 和结束时间 endTime。
+ // ParamUtils.getParam 方法用于从 JSONObject 中获取参数值。
String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
+
+ //构建SQL查询语句
+ // 查询表 user_visit_action 中 date 字段在 startTime 和 endTime 之间的所有记录。
String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
+
+ //执行SQL查询并获取DataFrame:
+ //使用 SQLContext 执行SQL查询,获取查询结果的 DataFrame。
DataFrame df=sc.sql(sql);
+
+ //将DataFrame转换为JavaRDD
+ //方便后续的RDD操作。
return df.javaRDD();
}
+ /**
+ * 功能解释
+ * SQL查询:通过SQL查询从数据库中获取特定时间段内的用户访问数据。
+ * 数据转换:将查询结果的 DataFrame 转换为 JavaRDD,以便进行后续的数据处理操作。
+ */
+
+
+
+
+
/**
* 将数据进行映射成为Pair,键为SessionId,Value为Row
* @param sessionRangeDate
* @return
*/
+//getSessonInfoPairRDD 方法
+
+ //定义方法签名:
+ //定义一个静态方法 getSessonInfoPairRDD,该方法接受一个 JavaRDD 类型的参数 sessionRangeDate,
+ // 并返回一个 JavaPairRDD。
private static JavaPairRDD getSessonInfoPairRDD(JavaRDD sessionRangeDate) {
+
+
+ //使用 mapToPair 方法:
+ //使用 mapToPair 方法将 JavaRDD 转换为 JavaPairRDD。
+ // mapToPair 方法接受一个 PairFunction 实现类,
+ // 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2。
return sessionRangeDate.mapToPair(new PairFunction() {
@Override
+ //实现 PairFunction:
+ //实现 PairFunction 接口的 call 方法。
+ // call 方法会接受一个 Row 对象作为输入,并返回一个 Tuple2。
public Tuple2 call(Row row) throws Exception {
+
+ //构造 Tuple2 对象:
+ //从 Row 对象中提取指定列的值(这里是第3列),
+ // 作为键 String,并将整个 Row 对象作为值 Row,返回一个 Tuple2。
return new Tuple2(row.getString(2),row);
}
});
+ /**
+ * 功能解释
+ * 映射操作:使用 mapToPair 方法将每个 Row 对象转换为一个键值对 Tuple2。具体来说,
+ * 键是 Row 对象中的第3列的值,值是整个 Row 对象。
+ * 生成键值对:通过提取 Row 对象中的特定字段生成键值对,方便后续的聚合和处理操作。
+ * getSessonInfoPairRDD 方法:将 JavaRDD 转换为 JavaPairRDD,
+ * 其中键为 Row 对象中的 sessionId,
+ * 值为整个 Row 对象。
+ * 这有助于后续的聚合和处理操作,例如按 sessionId 分组统计或其他复杂操作。
+ */
+
}
/**
@@ -179,6 +407,12 @@ public class UserVisitAnalyze {
* @param sessionInfoPairRDD
* @return
*/
+
+ //方法签名:
+ //定义一个静态方法 aggregateBySessionId,
+ // 该方法接受一个 SQLContext
+ // 和一个 JavaPairRDD 类型的参数 sessionInfoPairRDD,
+ // 并返回一个 JavaPairRDD。
private static JavaPairRDD aggregateBySessionId(SQLContext sc, JavaPairRDD sessionInfoPairRDD) {
/**
* 先将数据映射成map格式
@@ -186,20 +420,31 @@ public class UserVisitAnalyze {
/**
*
代码重构
- JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() {
- @Override
- public Tuple2 call(Row row) throws Exception {
- return new Tuple2(row.getString(2),row);
- }
+ JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getString(2),row);
+ }
});*/
/**
* 根据sessionId进行分组
*/
+ //根据 sessionId 进行分组:
+ //使用 groupByKey 方法将 JavaPairRDD 按 sessionId 分组,
+ // 得到一个 JavaPairRDD>,
+ // 其中键为 sessionId,值为具有相同 sessionId 的所有 Row 对象的迭代器。
JavaPairRDD> sessionActionGrouped=sessionInfoPairRDD.groupByKey();
+ //映射生成键值对:
+ //使用 mapToPair 方法将每个 Tuple2> 转换为一个 Tuple2。
+ // 具体来说,键为 Row 对象中的 userId,值为生成的字符串信息。
JavaPairRDD sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction>, Long, String>() {
@Override
public Tuple2 call(Tuple2> stringIterableTuple2) throws Exception {
+
+ //提取关键信息:
+ //从 Iterable 中提取 sessionId 和 rows。
+ //初始化一些变量,用于存储搜索关键字、点击类别ID、用户ID、开始时间、结束时间和步骤长度。
String sessionId=stringIterableTuple2._1;
Iterable rows=stringIterableTuple2._2;
StringBuffer searchKeywords=new StringBuffer();
@@ -208,19 +453,27 @@ public class UserVisitAnalyze {
Date startTime=null;
Date endTime=null;
int stepLength=0;
+
+ //遍历每一行 Row 对象:
+ //遍历每个 Row 对象,如果 userId 为空,则将其设置为当前的 userId。
+ //提取 searchKeyword 和 clickCategoryId。
for (Row row:rows)
{
if(userId==null)
userId=row.getLong(1);
String searchKeyword=row.getString(5);
Long clickCategoryId=row.getLong(6);
+
+
//判断是否需要拼接
+ //拼接搜索关键字和点击类别ID:
+ //检查 searchKeyword 是否为空并拼接到 searchKeywords 中。
if(StringUtils.isNotEmpty(searchKeyword))
{
if(!searchKeywords.toString().contains(searchKeyword))
searchKeywords.append(searchKeyword+",");
}
-
+ //检查 clickCategoryId 是否为 null 并拼接到 clickCategoryIds 中。
if(clickCategoryId!=null)
{
if(!clickCategoryId.toString().contains(String.valueOf(clickCategoryId)))
@@ -228,6 +481,9 @@ public class UserVisitAnalyze {
}
//计算session开始时间和结束时间
+ //从 Row 对象中提取 actionTime。
+ //更新 startTime 和 endTime,确保它们分别为最小和最大时间。
+ //增加 stepLength 记录步骤数。
Date actionTime= DateUtils.parseTime(row.getString(4));
if(startTime==null)
startTime=actionTime;
@@ -241,11 +497,15 @@ public class UserVisitAnalyze {
{
endTime=actionTime;
}
- stepLength++;
+ stepLength++;
}
- //访问时长(s)
+
+
+ //计算访问时长:
+ //计算访问时长(以秒为单位)。
Long visitLengtth=(endTime.getTime()-startTime.getTime())/1000;
+ //格式化信息并返回:
String searchKeywordsInfo=StringUtils.trimComma(searchKeywords.toString());
String clickCategoryIdsInfo=StringUtils.trimComma(clickCategoryIds.toString());
String info=Constants.FIELD_SESSIONID+"="+sessionId+"|"+Constants.FIELD_SERACH_KEYWORDS+"="+searchKeywordsInfo+"|"
@@ -256,41 +516,97 @@ public class UserVisitAnalyze {
});
//查询所有的用户数据
- String sql="select * from user_info";
- JavaRDD userInfoRDD=sc.sql(sql).javaRDD();
- //将用户信息映射成map
- JavaPairRDD userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction() {
- @Override
- public Tuple2 call(Row row) throws Exception {
- return new Tuple2(row.getLong(0),row);
- }
- });
- //将两个信息join在一起
+ String sql="select * from user_info";
+
+ //执行SQL查询获取用户信息,并将结果转换为 JavaRDD。
+ JavaRDD userInfoRDD=sc.sql(sql).javaRDD();
+
+ //将用户信息映射成map
+ //使用 mapToPair 方法将 JavaRDD 转换为 JavaPairRDD,
+ // 其中键为 Row 对象中的某个列(通常是 userId),值为整个 Row 对象。
+ JavaPairRDD userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getLong(0),row);
+ }
+ });
+
+ //将两个信息join在一起
+ //使用 join 方法将两个 JavaPairRDD 进行连接,连接键为 userId。结果是一个 JavaPairRDD>,
+ // 其中键为 userId,值为一个包含用户会话信息和用户信息的元组。
JavaPairRDD> tuple2JavaPairRDD=sessionPartInfo.join(userInfoPariRDD);
+ /**
+ * aggregateBySessionId 方法:将用户会话数据按 sessionId 分组,并生成每个会话的详细信息。具体包括搜索关键字、点击类别ID、访问时长、步骤数和开始时间。
+ * 查询用户数据:从数据库中查询用户信息。
+ * 映射用户信息:将用户信息转换为 JavaPairRDD。
+ * 连接会话信息和用户信息:将用户会话信息和用户信息进行连接,生成包含用户详细信息的会话数据。
+ * 通过这些步骤,可以获取每个用户在不同会话中的详细行为信息,为后续的分析和统计提供基础数据。
+ */
+
+
+
+
/**
* 拿到所需的session
*/
+ //定义 JavaPairRDD 变量 sessionInfo:
+ //定义一个名为 sessionInfo 的 JavaPairRDD,
+ // 并使用 mapToPair 方法对 tuple2JavaPairRDD 进行映射操作。
JavaPairRDD sessionInfo=tuple2JavaPairRDD.mapToPair(new PairFunction>, String, String>() {
@Override
public Tuple2 call(Tuple2> longTuple2Tuple2) throws Exception {
+
+ //提取会话信息和用户信息:
+ //从 Tuple2> 中提取 sessionPartInfo 和 userInfo。
+ //sessionPartInfo 是会话的详细信息,userInfo 是用户的详细信息。
String sessionPartInfo=longTuple2Tuple2._2._1;
Row userInfo=longTuple2Tuple2._2._2;
+
+
//拿到需要的用户信息
+ //从 Row 对象 userInfo 中提取用户的基本信息,
+ // 包括年龄 age、职业 professional、城市 city 和性别 sex。
int age=userInfo.getInt(3);
String professional=userInfo.getString(4);
String city=userInfo.getString(5);
String sex=userInfo.getString(6);
+
//拼接字符串
+ //将会话信息 sessionPartInfo 和用户信息拼接成一个完整的字符串 fullInfo。
String fullInfo=sessionPartInfo+"|"+Constants.FIELD_AGE+"="+age+"|"
+Constants.FIELD_PROFESSIONAL+"="+professional+"|"+Constants.FIELD_CITY+"="+city+"|"+Constants.FIELD_SEX+"="+sex;
+
+ //提取会话ID:
+ //使用 StringUtils.getFieldFromConcatString 方法从 sessionPartInfo 中提取会话ID,
+ // 假设会话ID的格式是 Constants.FIELD_SESSIONID。
String session=StringUtils.getFieldFromConcatString(sessionPartInfo,"\\|",Constants.FIELD_SESSIONID);
+
+ //返回结果:
+ //返回一个 Tuple2,
+ // 其中键为会话ID session,值为包含会话信息和用户信息的字符串 fullInfo。
return new Tuple2(session,fullInfo);
}
});
+ //返回处理后的 sessionInfo。
return sessionInfo;
}
+/**
+ * 功能总结
+ * 功能:该方法将 tuple2JavaPairRDD 中包含的会话信息和用户信息进行合并,生成一个包含会话详细信息和用户详细信息的字符串。然后将这些信息封装为 JavaPairRDD。
+ * 关键步骤:
+ * 提取信息:从 tuple2JavaPairRDD 中提取会话信息和用户信息。
+ * 提取用户信息:从 Row 对象中提取用户的基本信息,包括年龄、职业、城市和性别。
+ * 拼接信息:将会话信息和用户信息拼接成一个完整的字符串。
+ * 提取会话ID:从会话信息中提取会话ID。
+ * 返回结果:返回一个包含会话ID和完整信息的 JavaPairRDD。
+ * 通过这些步骤,可以将用户会话数据和用户信息合并在一起,为后续的分析和统计提供更详细的数据。
+ */
+
+
+
+
/**
@@ -300,8 +616,17 @@ public class UserVisitAnalyze {
* @param sessionAggrStatAccumulator
* @return
*/
+
+ //方法签名
+ //方法 filterSessionAndAggrStat 接受三个参数:
+ //sessionInfoRDD:一个 JavaPairRDD,包含会话ID和会话信息。
+ //taskParam:一个 JSONObject,包含筛选条件。
+ //sessionAggrStatAccumulator:一个 Accumulator,用于累加统计信息。
private static JavaPairRDD filterSessionAndAggrStat(JavaPairRDD sessionInfoRDD, final JSONObject taskParam, final Accumulator sessionAggrStatAccumulator){
//得到条件
+ //从任务参数中提取筛选条件
+ //从 taskParam 中提取不同的筛选条件,
+ //如年龄范围、职业、城市、性别、搜索关键词和点击类别ID。
String startAge=ParamUtils.getParam(taskParam,Constants.PARAM_STARTAGE);
String endAge=ParamUtils.getParam(taskParam,Constants.PARAM_ENDAGE);
String professionals=ParamUtils.getParam(taskParam,Constants.PARAM_PROFESSONALS);
@@ -311,44 +636,60 @@ public class UserVisitAnalyze {
String categoryIds=ParamUtils.getParam(taskParam,Constants.PARAM_CLICK_CATEGORYIDS);
//拼接参数
+ //将提取的筛选条件拼接成一个字符串 _paramter,方便后续进行条件判断。
String _paramter=(startAge!=null?Constants.PARAM_STARTAGE+"="+startAge+"|":"")+
(endAge!=null?Constants.PARAM_ENDAGE+"="+endAge+"|":"")+(professionals!=null?Constants.PARAM_PROFESSONALS+"="+professionals+"|":"")+
(cities!=null?Constants.PARAM_CIYTIES+"="+cities+"|":"")+(sex!=null?Constants.PARAM_SEX+"="+sex+"|":"")+
(keyWords!=null?Constants.PARAM_SERACH_KEYWORDS+"="+keyWords+"|":"")+(categoryIds!=null?Constants.PARAM_CLICK_CATEGORYIDS+"="+categoryIds+"|":"");
+ //去除末尾多余的 |
+ //确保参数格式正确。
if(_paramter.endsWith("\\|"))
_paramter=_paramter.substring(0,_paramter.length()-1);
+ //定义最终参数
+ //使用 final 关键字定义最终参数 paramter,确保在 filter 函数中可以使用。
final String paramter=_paramter;
+
+ //过滤和统计会话数据
JavaPairRDD filteredSessionRDD=sessionInfoRDD.filter(new Function, Boolean>() {
@Override
public Boolean call(Tuple2 tuple2) throws Exception {
String sessionInfo=tuple2._2;
//按照条件进行过滤
+ //
//按照年龄进行过滤
if(!ValidUtils.between(sessionInfo,Constants.FIELD_AGE,paramter,Constants.PARAM_STARTAGE,Constants.PARAM_ENDAGE))
return false;
+ //
//按照职业进行过滤
if(!ValidUtils.in(sessionInfo,Constants.FIELD_PROFESSIONAL,paramter,Constants.PARAM_PROFESSONALS))
return false;
+ //
//按照城市进行过滤
if(!ValidUtils.in(sessionInfo,Constants.FIELD_CITY,paramter,Constants.PARAM_CIYTIES))
return false;
+ //
//按照性别进行筛选
if(!ValidUtils.equal(sessionInfo,Constants.FIELD_SEX,paramter,Constants.PARAM_SEX))
return false;
+ //
//按照搜索词进行过滤,只要有一个搜索词即可
if(!ValidUtils.in(sessionInfo,Constants.FIELD_SERACH_KEYWORDS,paramter,Constants.PARAM_PROFESSONALS))
return false;
+
if(!ValidUtils.in(sessionInfo,Constants.FIELD_CLICK_CATEGORYIDS,paramter,Constants.FIELD_CLICK_CATEGORYIDS))
return false;
+
//如果经过了之前的所有的过滤条件,也就是满足用户筛选条件
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
+
//计算出访问时长和访问步长的范围并进行相应的累加
Long visitLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_VISIT_LENGTH));
Long stepLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_STEP_LENGTH));
+
//使用函数进行统计
calculateVisitLength(visitLength);
calculateStepLength(stepLength);
@@ -356,6 +697,8 @@ public class UserVisitAnalyze {
}
//统计访问时长的数量
+ //从会话信息中提取访问时长 visitLength。
+ //根据 visitLength 的范围进行统计,累加到 sessionAggrStatAccumulator 中。
private void calculateVisitLength(Long visitLegth)
{
if(visitLegth>=1&&visitLegth<=3)
@@ -377,7 +720,10 @@ public class UserVisitAnalyze {
else if(visitLegth>1800)
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
}
+
//统计访问步长的数量
+ //从会话信息中提取访问步长 stepLength。
+ //根据 stepLength 的范围进行统计,累加到 sessionAggrStatAccumulator 中。
private void calculateStepLength(Long stepLength)
{
if(stepLength>=1&&stepLength<=3)
@@ -391,11 +737,31 @@ public class UserVisitAnalyze {
else if(stepLength>30&&stepLength<=60)
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
else if(stepLength>60)
- sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
});
+ //返回过滤后的会话数据 filteredSessionRDD。
return filteredSessionRDD;
}
+ /**
+ * 功能总结
+ * 功能:该方法用于过滤和统计用户会话数据。根据传入的任务参数(如年龄、职业、城市、性别、搜索关键词和点击类别ID),对会话数据进行筛选,并统计访问时长和访问步长的范围。
+ * 关键步骤:
+ * 提取筛选条件:从 taskParam 中提取筛选条件。
+ * 拼接参数:将筛选条件拼接成一个字符串 _paramter。
+ * 过滤会话数据:使用 filter 方法对会话数据进行筛选。
+ * 统计访问时长:根据访问时长进行范围统计。
+ * 统计访问步长:根据访问步长进行范围统计。
+ * 返回结果:返回过滤后的会话数据 filteredSessionRDD。
+ * 通过这些步骤,可以筛选出符合特定条件的会话数据,并进行访问时长和访问步长的统计,为后续的数据分析提供基础。
+ */
+
+
+
+
+
+
+
/**
*
@@ -404,16 +770,40 @@ public class UserVisitAnalyze {
* @param sessionInfoPairRDD
* @return
*/
+
+ //定义一个静态方法 getFilterFullInfoRDD,该方法接受两个参数:
+ //filteredSessionRDD:一个 JavaPairRDD,包含过滤后的会话信息。
+ //sessionInfoPairRDD:一个 JavaPairRDD,
+ // 包含完整的会话信息(会话ID和包含详细信息的 Row 对象)。
private static JavaPairRDD getFilterFullInfoRDD(JavaPairRDD filteredSessionRDD, JavaPairRDD sessionInfoPairRDD) {
//1.获取符合条件的session范围的所有品类
+ //使用 filteredSessionRDD 和 sessionInfoPairRDD 进行 join 操作,
+ // 将两个 RDD 中的相同 key(会话ID)的数据连接在一起。
+ //使用 mapToPair 方法对连接后的结果进行转换,生成新的 JavaPairRDD。
return filteredSessionRDD.join(sessionInfoPairRDD).mapToPair(new PairFunction>, String, Row>() {
@Override
public Tuple2 call(Tuple2> stringTuple2Tuple2) throws Exception {
+ //从 Tuple2> 中提取会话ID stringTuple2Tuple2._1
+ // 和完整的 Row 对象 stringTuple2Tuple2._2._2。
+ //将会话ID和完整的 Row 对象封装为一个新的 Tuple2 并返回。
return new Tuple2(stringTuple2Tuple2._1,stringTuple2Tuple2._2._2);
}
});
}
+ /**
+ * 功能总结
+ * 功能:该方法用于获取符合条件的会话范围的所有品类的详细信息。具体步骤如下:
+ * 连接两个 RDD:使用 join 方法将 filteredSessionRDD 和 sessionInfoPairRDD 中的相同会话ID的数据连接在一起。
+ * 提取详细信息:通过 mapToPair 方法提取每个连接后的 Tuple2 中的会话ID和完整的 Row 对象。
+ * 返回结果:返回一个新的 JavaPairRDD,其中每个 Row 对象包含了完整的会话信息。
+ */
+
+
+
+
+
+
/**
* 随机抽取Sesison功能
* @param taskId
@@ -421,606 +811,1355 @@ public class UserVisitAnalyze {
* @param sessionInfoPairRDD
*/
private static void randomExtractSession(final Long taskId, JavaPairRDD filteredSessionRDD, JavaPairRDD sessionInfoPairRDD) {
- //1.先将过滤Seesion进行映射,映射成为Time,Info的数据格式
+ // 1. 先将过滤Seesion进行映射,映射成为Time,Info的数据格式
+ // 下面这行代码使用mapToPair操作对filteredSessionRDD进行转换,创建一个新的JavaPairRDD(键值对形式的RDD)。
+ // 这里传入了一个PairFunction实现,用于定义如何将原有的Tuple2类型的数据转换为新的键值对形式。
final JavaPairRDD mapDataRDD=filteredSessionRDD.mapToPair(new PairFunction, String, String>() {
@Override
public Tuple2 call(Tuple2 tuple2) throws Exception {
String info=tuple2._2;
- //获取开始的时间
+ // 获取开始的时间,通过StringUtils工具类的getFieldFromConcatString方法从info字符串中提取相应字段,以"|"作为分隔符,提取的是Constants.FIELD_START_TIME所指定的字段(应该是定义好的表示开始时间的字段索引之类的)。
+
String startTime=StringUtils.getFieldFromConcatString(info,"\\|",Constants.FIELD_START_TIME);
+ // 对获取到的开始时间进行格式化处理,调用DateUtils工具类的getDateHour方法,将时间格式化为特定的小时格式(具体格式由该方法内部定义)。
+
String formatStartTime=DateUtils.getDateHour(startTime);
+ // 返回一个新的Tuple2,以格式化后的时间作为键,原info字符串作为值,这样就完成了将原始数据按照特定格式进行映射转换。
+
return new Tuple2(formatStartTime,info);
}
});
- //计算每一个小时的Session数量
+ // 计算每一个小时的Session数量,通过countByKey方法统计mapDataRDD中每个键(也就是经过格式化后的时间)对应的元素数量,
+ // 返回一个Map,键是时间,值是对应的数量。
Map mapCount=mapDataRDD.countByKey();
+
+ // 遍历mapCount,这个mapCount里存放了每个时间对应的Session数量情况。
//设计一个新的数据结构Map> dateHourCount,日期作为Key,时间和数量作为Map
Map> dateHourCountMap=new HashMap>();
//遍历mapCount
for (Map.Entry entry:mapCount.entrySet())
{
+ // 从键(格式应该是类似"日期_小时"这样的字符串)中分割出日期部分,以"_"作为分隔符取第一个元素。
String date=entry.getKey().split("_")[0];
+
+ // 从键中分割出小时部分,以"_"作为分隔符取第二个元素。
String hour=entry.getKey().split("_")[1];
+ // 根据日期从dateHourCountMap中获取对应的内层Map(存放小时和数量的映射),
+ // 如果不存在则返回null。
Map hourCount=dateHourCountMap.get(date);
+
+ // 如果对应的内层Map不存在,就创建一个新的HashMap用来存放该日期下各个小时的Session数量。
if(hourCount==null)
{
+ // 将新创建的内层Map放入外层的dateHourCountMap中,以日期作为键。
hourCount=new HashMap();
dateHourCountMap.put(date,hourCount);
}
+ // 将当前小时对应的Session数量放入内层的hourCount Map中,
+ // 这里需要将entry.getValue()转换为Long类型
+ // (因为countByKey返回的值类型是Object,实际应该是Long表示数量)。
hourCount.put(hour,(Long)entry.getValue());
+
+ // 功能解释:
+ // 整体这段代码的功能主要是对给定的过滤后的会话数据(filteredSessionRDD)进行处理。
+ // 首先将其按照特定格式进行映射转换,把会话信息中的开始时间提取并格式化后作为键,原会话信息作为值存放在新的mapDataRDD中。
+ // 接着统计每个格式化后的时间对应的会话数量,得到一个时间和数量的映射mapCount。
+ // 最后构建了一个更复杂的嵌套Map结构dateHourCountMap,按照日期对每个小时的会话数量进行分类组织,方便后续按照日期和小时维度来进一步分析和处理会话数据相关的统计信息等操作。
+
}
- //将数据按照天数平均
- int countPerday=100/dateHourCountMap.size();
- //实现一个随机函数后面将会用到
- Random random=new Random();
- //设计一个新的数据结构,用于存储随机索引,Key是每一天,Map是小时和随机索引列表构成的
- final Map>> dateRandomExtractMap=new HashMap>>();
- for (Map.Entry> dateHourCount:dateHourCountMap.entrySet())
- {
- //日期
- String date=dateHourCount.getKey();
- //每一天个Session个数
- Long sessionCount=0L;
- for(Map.Entry hourCountMap:dateHourCount.getValue().entrySet())
- {
- sessionCount+=hourCountMap.getValue();
- }
- //获取每一天随机存储的Map
- Map> dayExtactMap=dateRandomExtractMap.get(date);
- if(dayExtactMap==null)
- {
- dayExtactMap=new HashMap>();
- dateRandomExtractMap.put(date,dayExtactMap);
- }
+ int countPerday = 100 / dateHourCountMap.size();
+// 这行代码的目的是计算每天平均要抽取的数量。它用固定值100除以dateHourCountMap中不同日期的数量(也就是总天数),
+// 这里假设是想按照平均分配的方式来确定每天大致要抽取多少个会话数据,不过要注意如果dateHourCountMap.size()为0会导致除零异常,实际应用中可能需要额外处理这种边界情况。
- //遍历每一个小时,计算出每一个小时的Session占比和抽取的数量
+// 实现一个随机函数后面将会用到
+ Random random = new Random();
+// 创建一个Random对象,用于后续生成随机数,例如生成随机索引来实现随机抽取会话数据的功能。
- for(Map.Entry hourCountMap:dateHourCount.getValue().entrySet())
- {
- int extractSize= (int) ((double) hourCountMap.getValue()/sessionCount*countPerday);
+// 设计一个新的数据结构,用于存储随机索引,Key是每一天,Map是小时和随机索引列表构成的
+ final Map>> dateRandomExtractMap = new HashMap>>();
+// 创建一个外层是按照日期为键,内层是按照小时为键,值为长整型列表(用来存放随机索引)的嵌套HashMap结构,
+// 这个数据结构将用于存储针对每个日期下每个小时抽取会话数据时所用到的随机索引信息。
- //如果抽离的长度大于被抽取数据的长度,那么抽取的长度就是被抽取长度
- extractSize= extractSize>hourCountMap.getValue()?hourCountMap.getValue().intValue():extractSize;
+ for (Map.Entry> dateHourCount : dateHourCountMap.entrySet()) {
+ // 日期
+ String date = dateHourCount.getKey();
+ // 从dateHourCountMap的每一个元素(外层键值对,键是日期,值是包含小时和对应会话数量的内层Map)中获取日期部分,也就是外层键。
- //获取存储每一个小时的List
- List indexList=dayExtactMap.get(hourCountMap.getKey());
- if(indexList==null)
- {
- indexList=new ArrayList();
- dayExtactMap.put(hourCountMap.getKey(),indexList);
+ // 每一天个Session个数
+ Long sessionCount = 0L;
+ // 初始化一个变量用来统计当前日期下总的会话数量,初始值设为0。
+
+ for (Map.Entry hourCountMap : dateHourCount.getValue().entrySet()) {
+ // 遍历当前日期对应的内层Map(存放每个小时和对应的会话数量),获取每一个小时的会话数量,并累加到sessionCount变量中,
+ // 这样最终sessionCount就表示当前日期下总的会话数量了。
+ sessionCount += hourCountMap.getValue();
+ }
+
+ // 获取每一天随机存储的Map
+ Map> dayExtactMap = dateRandomExtractMap.get(date);
+ // 根据当前日期从dateRandomExtractMap中获取对应的内层Map(用来存放每个小时的随机索引列表的那个Map),如果不存在则返回null。
+
+ if (dayExtactMap == null) {
+ // 如果对应的内层Map不存在,就创建一个新的HashMap,用来存放当前日期下各个小时对应的随机索引列表。
+ dayExtactMap = new HashMap>();
+ // 将新创建的内层Map放入外层的dateRandomExtractMap中,以日期作为键,这样就完成了针对当前日期的内层Map初始化操作。
+ dateRandomExtractMap.put(date, dayExtactMap);
+ }
+
+ // 遍历每一个小时,计算出每一个小时的Session占比和抽取的数量
+ for (Map.Entry hourCountMap : dateHourCount.getValue().entrySet()) {
+ // 计算当前小时的会话数据抽取数量。先将当前小时的会话数量除以当天总的会话数量得到占比,再乘以每天平均要抽取的数量countPerday,
+ // 并将结果转换为整型作为当前小时预计抽取的数量,这里涉及到类型转换(从Long类型的会话数量等转换为int类型的抽取数量),要注意可能出现数据精度丢失等情况。
+ int extractSize = (int) ((double) hourCountMap.getValue() / sessionCount * countPerday);
+
+ // 如果抽离的长度大于被抽取数据的长度,那么抽取的长度就是被抽取长度
+ extractSize = extractSize > hourCountMap.getValue()? hourCountMap.getValue().intValue() : extractSize;
+ // 进行边界判断,如果计算出来的抽取数量大于当前小时实际的会话数量,那就把抽取数量设置为当前小时实际的会话数量,
+ // 避免出现抽取数量超出实际可抽取范围的情况,这里同样进行了类型转换确保赋值给int类型的extractSize变量。
+
+ // 获取存储每一个小时的List
+ List indexList = dayExtactMap.get(hourCountMap.getKey());
+ // 从当前日期对应的内层Map(dayExtactMap)中获取当前小时对应的随机索引列表,如果不存在则返回null。
+
+ if (indexList == null) {
+ // 如果对应的随机索引列表不存在,就创建一个新的ArrayList用来存放随机索引。
+ indexList = new ArrayList();
+ // 将新创建的列表放入dayExtactMap中,以当前小时作为键,这样就完成了针对当前小时的随机索引列表初始化操作。
+ dayExtactMap.put(hourCountMap.getKey(), indexList);
}
- //使用随机函数生成随机索引
- for(int i=0;i> time2GroupRDD=mapDataRDD.groupByKey();
- //将抽取的信息持久化到数据库,并返回SessionIds对,然后和以前的信息Join
- JavaPairRDD sessionIds= time2GroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() {
+// 功能解释:
+// 这段代码整体的功能是基于之前整理好的按日期和小时统计会话数量的dateHourCountMap数据结构,进行随机抽取会话数据的相关准备工作。
+// 首先计算出每天平均要抽取的会话数量(countPerday),然后创建了一个新的嵌套Map结构(dateRandomExtractMap)用于存储随机抽取会话数据所需的随机索引信息。
+// 接着针对每一个日期,先统计该日期下总的会话数量,然后对于该日期下的每一个小时,计算出按照比例应该抽取的会话数量(extractSize),同时考虑边界情况进行调整。
+// 之后针对每个小时,创建或获取对应的随机索引列表,通过随机函数生成指定数量(extractSize)的唯一随机索引并添加到列表中。
+// 最终目的是构建好dateRandomExtractMap这个数据结构,为后续依据这些随机索引从原始会话数据中真正抽取相应的会话数据提供依据,实现按照一定规则随机抽取会话数据的功能,以便进行例如抽样分析等相关业务操作。
+
+// 2.将上面计算的RDD进行分组,然后使用FlatMap进行压平,然后判断是否在索引中,如果在,那么将这个信息持久化
+ JavaPairRDD> time2GroupRDD = mapDataRDD.groupByKey();
+// 调用mapDataRDD(应该是之前已经存在且经过一定处理的JavaPairRDD)的groupByKey方法进行分组操作。
+// 按照元素的键进行分组,将具有相同键的元素的对应值汇聚在一起,形成新的键值对存储在time2GroupRDD中,
+// 新的JavaPairRDD中键的类型为String,值的类型为Iterable,表示对应键的多个值组成的可迭代集合。
+ // 将抽取的信息持久化到数据库,并返回SessionIds对,然后和以前的信息Join
+ JavaPairRDD sessionIds = time2GroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() {
+ // 调用time2GroupRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例,
+ // 用于定义将输入的键值对(这里输入类型是Tuple2>,即前面分组后的结果类型)
+ // 转换为零个、一个或多个新的键值对(输出类型是Tuple2,也就是新的JavaPairRDD的键值对类型)的逻辑。
@Override
public Iterable> call(Tuple2> tuple2) throws Exception {
- String dateStr=tuple2._1;
- String date=dateStr.split("_")[0];
- String hour=dateStr.split("_")[1];
- //使用一个List存储sessionId
- List> sessionIds=new ArrayList>();
- List indexList=dateRandomExtractMap.get(date).get(hour);
- //使用一个list保存需要持久化到数据库的对象
- List sessionRandomExtractList=new ArrayList();
- int index=0;
- for (String infos:tuple2._2) {
- if(indexList.contains(Long.valueOf(index)))
- {
- //构建SessionRandomExtract
- SessionRandomExtract sessionRandomExtract=new SessionRandomExtract();
- final String sessionId=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SESSIONID);
- String startTime=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_START_TIME);
- String searchKeyWards=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SERACH_KEYWORDS);
- String clickCategoryIds=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_CLICK_CATEGORYIDS);
+ // 当执行flatMapToPair操作时,会针对time2GroupRDD中的每一个元素(即每个键值对)调用这个call方法,
+ // 参数tuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2>。
+ String dateStr = tuple2._1;
+ // 从传入的键值对tuple2中获取其键(Key)部分,并将其赋值给dateStr变量,
+ // 从后续代码对dateStr的处理来看,这个键应该是按照特定格式组织的字符串,可能包含日期和时间等相关信息。
+ String date = dateStr.split("_")[0];
+ // 调用split方法,以“_”作为分隔符,对dateStr字符串进行拆分操作,取拆分后得到的第一个子字符串,
+ // 并将其赋值给date变量,按照推测,这个部分应该是代表日期的信息,具体格式依实际业务而定。
+ String hour = dateStr.split("_")[1];
+ // 同样以“_”作为分隔符拆分dateStr,取拆分后的第二个子字符串,赋值给hour变量,
+ // 通常其代表的是小时相关的内容,具体含义要结合业务场景判断。通过这样的拆分操作,后续可依据日期和小时查找对应索引等信息来进一步处理数据。
+ // 使用一个List存储sessionId
+ List> sessionIds = new ArrayList>();
+ // 创建一个名为sessionIds的ArrayList列表,用于存储特定格式的键值对,其键和值的类型都是String,
+ // 主要用来存放与sessionId相关的键值对,方便后续和其他数据集基于sessionId进行关联等操作,提前整理好对应的数据格式进行存储。
+ List indexList = dateRandomExtractMap.get(date).get(hour);
+ // 这里涉及到dateRandomExtractMap这个变量,推测它是类似嵌套的映射结构(可能是Map>>类型),
+ // 外层的键对应日期,内层的键对应小时,值是由Long类型元素组成的列表。
+ // 首先通过dateRandomExtractMap.get(date)依据前面获取到的date(代表日期的字符串)查找对应日期下的内层映射,
+ // 再通过.get(hour)根据hour(代表小时的字符串)从内层映射中获取对应小时下存储的Long类型元素组成的列表,此列表就是indexList,
+ // 后续将用它作为判断当前处理的数据中哪些元素需要进行持久化等操作的依据。
+ // 使用一个list保存需要持久化到数据库的对象
+ List sessionRandomExtractList = new ArrayList();
+ // 创建一个名为sessionRandomExtractList的ArrayList列表,用于存放SessionRandomExtract类型的对象。
+ // SessionRandomExtract是自定义的实体类,用于封装从输入数据中提取出来、准备持久化到数据库的业务相关数据。
+ // 后续会把符合特定条件的数据封装成该类的对象,并添加到这个列表中,虽当前代码没展示具体持久化数据库的操作,但整体是为后续做准备。
+ int index = 0;
+ // 初始化一个名为index的整型变量,并将其初始值设为0,用于在后续遍历tuple2的值(Iterable部分)中的每个元素时,
+ // 作为元素的索引计数,方便和前面获取到的indexList中的索引值进行比较,以判断每个元素是否需要进行相应处理操作。
+ for (String infos : tuple2._2) {
+ // 开始一个for循环,用于遍历tuple2键值对中的值部分(即tuple2._2,其类型是Iterable),
+ // 在每次循环中,会取出其中一个字符串元素赋值给infos变量,后续就在循环体内部对每个infos字符串进行相关解析和处理操作。
+ if (indexList.contains(Long.valueOf(index))) {
+ // 在循环体中,先将当前的index(正在遍历的元素在tuple2._2中的索引位置)转换为Long类型(因为indexList是Long类型的列表),
+ // 然后通过contains方法判断这个索引值是否在indexList中。若在其中,意味着当前遍历到的infos对应的元素需进行后续特定处理,
+ // 如下文构建对象、提取信息并持久化等操作。
+ // 构建SessionRandomExtract
+ SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
+ // 创建一个SessionRandomExtract类型的对象sessionRandomExtract,它用于封装从infos字符串中提取的业务相关信息,
+ // 后续会对其各个属性进行赋值操作,再将其添加到sessionRandomExtractList列表中,最终实现将业务信息持久化到数据库的目的。
+ final String sessionId = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SESSIONID);
+ // 通过调用StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 从infos字符串中按照“|”作为分隔符提取出对应Constants.FIELD_SESSIONID这个字段对应的内容,
+ // Constants.FIELD_SESSIONID应是在Constants类中定义好的常量,用于标识sessionId在字符串中的位置等信息,
+ // 提取出的内容赋值给sessionId变量。
+ String startTime = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_START_TIME);
+ // 同理,从infos字符串中提取出代表开始时间相关字段内容,赋值给startTime变量,用于后续给sessionRandomExtract对象设置属性值。
+ String searchKeyWards = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SERACH_KEYWORDS);
+ // 从infos字符串中提取出代表搜索关键词相关字段内容,赋值给searchKeyWards变量,为后续设置对象属性做准备。
+ String clickCategoryIds = StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_CLICK_CATEGORYIDS);
+ // 从infos字符串中提取出代表点击类别IDs相关字段内容,赋值给clickCategoryIds变量,以便后续设置对象属性。
sessionRandomExtract.set(taskId,sessionId,startTime,searchKeyWards,clickCategoryIds);
- //添加到List中然后持久化到数据库中
+ // 调用sessionRandomExtract对象的set方法(假设SessionRandomExtract类有此方法用于设置各属性值),
+ // 将前面提取出来的各个业务相关信息以及外部定义好的taskId(与当前任务相关的标识,外部已赋值确定)设置到sessionRandomExtract对象中,
+ // 完成对该对象的属性赋值操作,使其封装好要持久化的数据信息。
+ // 添加到List中然后持久化到数据库中
sessionRandomExtractList.add(sessionRandomExtract);
+ // 将已经封装好业务信息的sessionRandomExtract对象添加到sessionRandomExtractList列表中,
+ // 后续应会有其他代码利用这个列表中的对象进行数据库持久化操作,将这些对象代表的数据插入数据库相应表中存储。
sessionIds.add(new Tuple2(sessionId,sessionId));
+ // 创建一个键值对,其键和值都是当前提取出来的sessionId,然后将这个键值对添加到前面创建的sessionIds列表中,
+ // 这样做可能是为了后续和其他数据集基于sessionId进行关联操作(如Join操作)时方便整理出对应的关联数据对,便于后续整合和分析数据。
}
index++;
+ // 在每次循环结束后,将index变量的值自增1,使其指向下一个元素在tuple2._2中的索引位置,
+ // 以便在下一轮循环中正确判断对应元素是否在indexList中,从而继续对下一个元素进行相应处理操作。
}
- //持久化到数据库
- DaoFactory.getSessionRandomExtractDao().batchInsert(sessionRandomExtractList);
return sessionIds;
+ // 返回包含sessionId键值对的列表sessionIds,这是flatMapToPair方法要求的返回类型,
+ // 最终这些返回的键值对会构成新的JavaPairRDD(即前面定义的sessionIds变量所代表的RDD)。
}
});
-
- //3. 获取session的明细数据保存到数据库
- JavaPairRDD> sessionDetailRDD= sessionIds.join(sessionInfoPairRDD);
- sessionDetailRDD.foreachPartition(new VoidFunction>>>() {
- @Override
- public void call(Iterator>> tuple2Iterator) throws Exception {
- List sessionDetailList=new ArrayList();
- while(tuple2Iterator.hasNext())
- {
- Tuple2> tuple2=tuple2Iterator.next();
- Row row=tuple2._2._2;
- String sessionId=tuple2._1;
- Long userId=row.getLong(1);
- Long pageId=row.getLong(3);
- String actionTime=row.getString(4);
- String searchKeyWard=row.getString(5);
- Long clickCategoryId=row.getLong(6);
- Long clickProducetId=row.getLong(7);
- String orderCategoryId=row.getString(8);
- String orderProducetId=row.getString(9);
- String payCategoryId=row.getString(10);
- String payProducetId=row.getString(11);
- SessionDetail sessionDetail=new SessionDetail();
- sessionDetail.set(taskId,userId,sessionId,pageId,actionTime,searchKeyWard,clickCategoryId,clickProducetId,orderCategoryId,orderProducetId,payCategoryId,payProducetId);
- sessionDetailList.add(sessionDetail);
- }
- DaoFactory.getSessionDetailDao().batchInsert(sessionDetailList);
- }
- });
-
}
//计算各个范围的占比,并持久化到数据库
- private static void calculateAndPersist(String value,Long taskId) {
+ private static void calculateAndPersist(String value, Long taskId) {
+ // 此方法用于根据传入的字符串value和任务ID taskId进行相关数据的计算,
+ // 并将计算结果封装成对象后持久化到数据库中,整体是数据处理及存储相关的逻辑。
+
//System.out.println(value);
- Long sessionCount=Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));
- //各个范围的访问时长
- Double visit_Length_1s_3s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));
- Double visit_Length_4s_6s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));
- Double visit_Length_7s_9s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));
- Double visit_Length_10s_30s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));
- Double visit_Length_30s_60s=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));
- Double visit_Length_1m_3m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));
- Double visit_Length_3m_10m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));
- Double visit_Length_10m_30m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));
- Double visit_Length_30m=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));
-
- //各个范围的访问步长
- Double step_Length_1_3=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));
- Double step_Length_4_6=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));
- Double step_Length_7_9=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));
- Double step_Length_10_30=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));
- Double step_Length_30_60=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));
- Double step_Length_60=Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));
-
- //访问时长对应的sesison占比,保留3位小数
- double visit_Length_1s_3s_ratio=NumberUtils.formatDouble(visit_Length_1s_3s/sessionCount,3);
- double visit_Length_4s_6s_ratio=NumberUtils.formatDouble(visit_Length_4s_6s/sessionCount,3);
- double visit_Length_7s_9s_ratio=NumberUtils.formatDouble(visit_Length_7s_9s/sessionCount,3);
- double visit_Length_10s_30s_ratio=NumberUtils.formatDouble(visit_Length_10s_30s/sessionCount,3);
- double visit_Length_30s_60s_ratio=NumberUtils.formatDouble(visit_Length_30s_60s/sessionCount,3);
- double visit_Length_1m_3m_ratio=NumberUtils.formatDouble(visit_Length_1m_3m/sessionCount,3);
- double visit_Length_3m_10m_ratio=NumberUtils.formatDouble(visit_Length_3m_10m/sessionCount,3);
- double visit_Length_10m_30m_ratio=NumberUtils.formatDouble(visit_Length_10m_30m/sessionCount,3);
- double visit_Length_30m_ratio=NumberUtils.formatDouble(visit_Length_30m/sessionCount,3);
-
- //访问步长对应的session占比,保留3位小数
- double step_Length_1_3_ratio= NumberUtils.formatDouble(step_Length_1_3/sessionCount,3);
- double step_Length_4_6_ratio=NumberUtils.formatDouble(step_Length_4_6/sessionCount,3);
- double step_Length_7_9_ratio=NumberUtils.formatDouble(step_Length_7_9/sessionCount,3);
- double c=NumberUtils.formatDouble(step_Length_10_30/sessionCount,3);
- double step_Length_30_60_ratio=NumberUtils.formatDouble(step_Length_30_60/sessionCount,3);
- double step_Length_60_ratio=NumberUtils.formatDouble(step_Length_60/sessionCount,3);
-
- SessionAggrStat sessionAggrStat=new SessionAggrStat();
- sessionAggrStat.set(taskId,sessionCount,visit_Length_1s_3s_ratio,visit_Length_4s_6s_ratio,
- visit_Length_7s_9s_ratio,visit_Length_10s_30s_ratio,visit_Length_30s_60s_ratio,
- visit_Length_1m_3m_ratio,visit_Length_3m_10m_ratio,visit_Length_10m_30m_ratio,visit_Length_30m_ratio
- ,step_Length_1_3_ratio,step_Length_4_6_ratio,step_Length_7_9_ratio,step_Length_7_9_ratio,step_Length_30_60_ratio,step_Length_60_ratio);
- List sessionAggrStatList=new ArrayList();
+ // 这行代码被注释掉了,原本可能是用于调试输出value的值,查看传入的字符串具体内容。
+
+ Long sessionCount = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从传入的value字符串中提取出对应Constants.SESSION_COUNT(应该是在Constants类中定义好的常量,
+ // 用于标识sessionCount在字符串中的位置等信息)这个字段对应的内容,然后将其转换为Long类型,赋值给sessionCount变量,
+ // 该变量用于记录会话数量相关信息。
+
+ // 各个范围的访问时长
+ Double visit_Length_1s_3s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s));
+ // 同样使用StringUtils工具类按照“|”分隔符从value字符串中提取出对应Constants.TIME_PERIOD_1s_3s字段对应的内容,
+ // 并转换为Double类型,赋值给visit_Length_1s_3s变量,用于记录访问时长在1秒到3秒这个范围的相关数据。
+
+ Double visit_Length_4s_6s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s));
+ // 从value字符串中提取对应Constants.TIME_PERIOD_4s_6s字段内容并转换为Double类型,赋值给visit_Length_4s_6s变量,
+ // 用于记录访问时长在4秒到6秒范围的数据。
+
+ Double visit_Length_7s_9s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s));
+ // 提取value字符串中对应Constants.TIME_PERIOD_7s_9s字段内容转换为Double类型,赋值给visit_Length_7s_9s变量,
+ // 代表访问时长在7秒到9秒范围的数据。
+
+ Double visit_Length_10s_30s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s));
+ // 按指定分隔符从value字符串中提取对应字段内容并转换为Double类型,赋值给visit_Length_10s_30s变量,
+ // 用于记录访问时长在10秒到30秒范围的数据。
+
+ Double visit_Length_30s_60s = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s));
+ // 从value字符串提取对应字段内容转换为Double类型后赋值给visit_Length_30s_60s变量,
+ // 表示访问时长在30秒到60秒范围的数据。
+
+ Double visit_Length_1m_3m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m));
+ // 提取value字符串中对应Constants.TIME_PERIOD_1m_3m字段内容转换为Double类型,赋值给visit_Length_1m_3m变量,
+ // 用于记录访问时长在1分钟到3分钟范围的数据(这里的“m”应该表示分钟,结合业务场景理解时长范围)。
+
+ Double visit_Length_3m_10m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m));
+ // 按照分隔符从value字符串获取对应字段内容转换为Double类型,赋值给visit_Length_3m_10m变量,
+ // 代表访问时长在3分钟到10分钟范围的数据。
+
+ Double visit_Length_10m_30m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m));
+ // 从value字符串提取对应字段内容转换为Double类型后赋值给visit_Length_10m_30m变量,
+ // 用于记录访问时长在10分钟到30分钟范围的数据。
+
+ Double visit_Length_30m = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m));
+ // 提取value字符串中对应Constants.TIME_PERIOD_30m字段内容转换为Double类型,赋值给visit_Length_30m变量,
+ // 表示访问时长大于30分钟的数据(这里根据变量名及业务逻辑推测其含义)。
+
+ // 各个范围的访问步长
+ Double step_Length_1_3 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3));
+ // 使用StringUtils工具类从value字符串中提取对应Constants.STEP_PERIOD_1_3字段内容并转换为Double类型,
+ // 赋值给step_Length_1_3变量,用于记录访问步长在1到3这个范围的数据(具体单位等需结合业务确定)。
+
+ Double step_Length_4_6 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6));
+ // 从value字符串提取对应Constants.STEP_PERIOD_4_6字段内容转换为Double类型,赋值给step_Length_4_6变量,
+ // 代表访问步长在4到6范围的数据。
+
+ Double step_Length_7_9 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9));
+ // 按照分隔符从value字符串中获取对应字段内容转换为Double类型,赋值给step_Length_7_9变量,
+ // 用于记录访问步长在7到9范围的数据。
+
+ Double step_Length_10_30 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30));
+ // 提取value字符串中对应Constants.STEP_PERIOD_10_30字段内容转换为Double类型,赋值给step_Length_10_30变量,
+ // 表示访问步长在10到30范围的数据。
+
+ Double step_Length_30_60 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60));
+ // 从value字符串获取对应字段内容转换为Double类型后赋值给step_Length_30_60变量,
+ // 用于记录访问步长在30到60范围的数据。
+
+ Double step_Length_60 = Double.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60));
+ // 提取value字符串中对应Constants.STEP_PERIOD_60字段内容转换为Double类型,赋值给step_Length_60变量,
+ // 代表访问步长大于60的数据(根据变量名及业务场景推测其含义)。
+
+ // 访问时长对应的sesison占比,保留3位小数
+ double visit_Length_1s_3s_ratio = NumberUtils.formatDouble(visit_Length_1s_3s / sessionCount, 3);
+ // 先计算访问时长在1秒到3秒范围的数据(visit_Length_1s_3s)与会话数量(sessionCount)的比值,
+ // 然后通过NumberUtils工具类(自定义的用于数字处理的工具类,推测有格式化数字的功能)的formatDouble方法,
+ // 将该比值保留3位小数,结果赋值给visit_Length_1s_3s_ratio变量,用于表示此访问时长范围对应的会话占比情况。
+
+ double visit_Length_4s_6s_ratio = NumberUtils.formatDouble(visit_Length_4s_6s / sessionCount, 3);
+ // 计算访问时长在4秒到6秒范围的数据与会话数量的比值,并使用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_4s_6s_ratio变量,代表该时长范围对应的会话占比。
+
+ double visit_Length_7s_9s_ratio = NumberUtils.formatDouble(visit_Length_7s_9s / sessionCount, 3);
+ // 计算访问时长在7秒到9秒范围的数据和会话数量的比值,经NumberUtils工具类格式化保留3位小数后,
+ // 赋值给visit_Length_7s_9s_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_10s_30s_ratio = NumberUtils.formatDouble(visit_Length_10s_30s / sessionCount, 3);
+ // 计算访问时长在10秒到30秒范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_10s_30s_ratio变量,表示该时长范围对应的会话占比。
+
+ double visit_Length_30s_60s_ratio = NumberUtils.formatDouble(visit_Length_30s_60s / sessionCount, 3);
+ // 计算访问时长在30秒到60秒范围的数据和会话数量的比值,利用NumberUtils工具类保留3位小数后,
+ // 赋值给visit_Length_30s_60s_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_1m_3m_ratio = NumberUtils.formatDouble(visit_Length_1m_3m / sessionCount, 3);
+ // 计算访问时长在1分钟到3分钟范围的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数,
+ // 赋值给visit_Length_1m_3m_ratio变量,代表该时长范围对应的会话占比。
+
+ double visit_Length_3m_10m_ratio = NumberUtils.formatDouble(visit_Length_3m_10m / sessionCount, 3);
+ // 计算访问时长在3分钟到10分钟范围的数据和会话数量的比值,使用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_3m_10m_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double visit_Length_10m_30m_ratio = NumberUtils.formatDouble(visit_Length_10m_30m / sessionCount, 3);
+ // 计算访问时长在10分钟到30分钟范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_10m_30m_ratio变量,表示该时长范围对应的会话占比。
+
+ double visit_Length_30m_ratio = NumberUtils.formatDouble(visit_Length_30m / sessionCount, 3);
+ // 计算访问时长大于30分钟的数据与会话数量的比值,利用NumberUtils工具类保留3位小数,
+ // 赋值给visit_Length_30m_ratio变量,用于记录此情况对应的会话占比。
+
+ // 访问步长对应的session占比,保留3位小数
+ double step_Length_1_3_ratio = NumberUtils.formatDouble(step_Length_1_3 / sessionCount, 3);
+ // 计算访问步长在1到3范围的数据(step_Length_1_3)与会话数量(sessionCount)的比值,
+ // 借助NumberUtils工具类的formatDouble方法保留3位小数,赋值给step_Length_1_3_ratio变量,
+ // 用于表示该访问步长范围对应的会话占比情况。
+
+ double step_Length_4_6_ratio = NumberUtils.formatDouble(step_Length_4_6 / sessionCount, 3);
+ // 计算访问步长在4到6范围的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数后,
+ // 赋值给step_Length_4_6_ratio变量,代表该步长范围对应的会话占比。
+
+ double step_Length_7_9_ratio = NumberUtils.formatDouble(step_Length_7_9 / sessionCount, 3);
+ // 计算访问步长在7到9范围的数据和会话数量的比值,使用NumberUtils工具类保留3位小数,
+ // 赋值给step_Length_7_9_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double c = NumberUtils.formatDouble(step_Length_10_30 / sessionCount, 3);
+ // 计算访问步长在10到30范围的数据与会话数量的比值,通过NumberUtils工具类保留3位小数,
+ // 赋值给c变量(这里变量名可能不太符合规范,推测应该是step_Length_10_30_ratio之类更表意清晰的名称,需根据实际情况确认),
+ // 用于表示该步长范围对应的会话占比情况。
+
+ double step_Length_30_60_ratio = NumberUtils.formatDouble(step_Length_30_60 / sessionCount, 3);
+ // 计算访问步长在30到60范围的数据和会话数量的比值,利用NumberUtils工具类保留3位小数后,
+ // 赋值给step_Length_30_60_ratio变量,用于记录此范围对应的会话占比情况。
+
+ double step_Length_60_ratio = NumberUtils.formatDouble(step_Length_60 / sessionCount, 3);
+ // 计算访问步长大于60的数据与会话数量的比值,经NumberUtils工具类格式化保留3位小数,
+ // 赋值给step_Length_60_ratio变量,代表该情况对应的会话占比。
+
+ SessionAggrStat sessionAggrStat = new SessionAggrStat();
+ // 创建一个SessionAggrStat类型的对象sessionAggrStat,此类应该是自定义的用于封装要持久化到数据库的相关数据的实体类。
+
+ sessionAggrStat.set(taskId, sessionCount, visit_Length_1s_3s_ratio, visit_Length_4s_6s_ratio,
+ visit_Length_7s_9s_ratio, visit_Length_10s_30s_ratio, visit_Length_30s_60s_ratio,
+ visit_Length_1m_3m_ratio, visit_Length_3m_10m_ratio, visit_Length_10m_30m_ratio, visit_Length_30m_ratio
+ , step_Length_1_3_ratio, step_Length_4_6_ratio, step_Length_7_9_ratio, step_Length_7_9_ratio, step_Length_30_60_ratio, step_Length_60_ratio);
+ // 调用sessionAggrStat对象的set方法(假设SessionAggrStat类有此方法用于设置对象的各个属性值),
+ // 将前面计算好的任务ID、会话数量、各访问时长及访问步长对应的会话占比等数据设置到sessionAggrStat对象中,
+ // 完成对该对象属性的赋值操作,使其封装好要持久化到数据库的全部数据信息。
+
+ List sessionAggrStatList = new ArrayList();
+ // 创建一个名为sessionAggrStatList的ArrayList列表,用于存放SessionAggrStat类型的对象,
+ // 在这里主要是为了方便后续批量插入数据库操作,将单个封装好数据的对象放入列表中进行统一处理。
+
sessionAggrStatList.add(sessionAggrStat);
+ // 将前面创建并赋值好的sessionAggrStat对象添加到sessionAggrStatList列表中,准备进行批量插入数据库操作。
+
// 插入数据库
DaoFactory.getSessionAggrStatDao().batchInsert(sessionAggrStatList);
+ // 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getSessionAggrStatDao方法,
+ // 获取到用于操作SessionAggrStat数据的DAO对象,然后调用其batchInsert方法,
+ // 将包含要持久化数据的sessionAggrStatList列表传入,实现将数据批量插入到数据库中的功能,完成整个数据持久化流程。
}
+
+
/**
* 获取top10热门品类
* @param taskId
* @param sessionId2DetailRDD
*/
- private static List> getTop10Category(Long taskId, JavaPairRDD sessionId2DetailRDD) {
- //1.第一步已抽离出方法getFilterFullInfoRDD
- //2。获取访问的品类id,访问过表示点击,下单,支付
- JavaPairRDD categoryRDD=sessionId2DetailRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ private static List> getTop10Category(Long taskId, JavaPairRDD sessionId2DetailRDD) {
+ // 此方法名为getTop10Category,作用是获取排名前10的品类相关信息(具体从返回值及后续代码逻辑推测),
+ // 接收一个任务ID(taskId)和一个JavaPairRDD类型的sessionId2DetailRDD作为参数,
+ // 下面的代码将基于这个RDD逐步提取、处理品类相关数据,这里先从获取访问的品类id开始着手处理。
+
+ // 1.第一步已抽离出方法getFilterFullInfoRDD
+ // 此处表明前面应该有一个名为getFilterFullInfoRDD的方法已经执行过了,它完成了一部分前置的数据处理操作,
+ // 不过这里并没有展示该方法具体的实现内容,但本方法是在此基础上继续后续流程的。
+
+ // 2。获取访问的品类id,访问过表示点击,下单,支付
+ JavaPairRDD categoryRDD = sessionId2DetailRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() {
+ // 调用sessionId2DetailRDD的flatMapToPair方法,传入一个实现了PairFlatMapFunction接口的匿名内部类实例。
+ // 这个接口用于定义将输入的键值对(此处输入类型是Tuple2,也就是sessionId2DetailRDD里元素的类型)
+ // 转换为零个、一个或多个新的键值对(输出类型为Tuple2,是即将生成的新JavaPairRDD的键值对类型)的逻辑,
+ // 目的在于从原始的RDD数据中提取出与访问品类相关的信息,并整理成新的键值对形式方便后续处理。
+
@Override
public Iterable> call(Tuple2 stringRowTuple2) throws Exception {
- Row row=stringRowTuple2._2;
- List> visitCategoryList=new ArrayList>();
- Long clickCategoryId=row.getLong(6);
- //点击品类的id
- if(clickCategoryId!=null)
- visitCategoryList.add(new Tuple2(clickCategoryId,clickCategoryId));
-
- if(row.get(8)!=null){
- String[] orderCategoryIdsSplited=row.getString(8).split(",");
- for (String orderCategoryId:
+ // 重写了PairFlatMapFunction接口中的call方法,当执行flatMapToPair操作时,针对sessionId2DetailRDD中的每一个元素(每个键值对),
+ // 都会调用这个call方法进行处理,参数stringRowTuple2就是当前正在处理的那个键值对元素,其类型符合接口定义中的Tuple2。
+
+ Row row = stringRowTuple2._2;
+ // 从传入的键值对stringRowTuple2中获取其值部分(是Row类型的数据),赋值给row变量。
+ // Row类型通常代表一行包含多个字段的数据,后续会从这个row中提取出与品类相关的各个字段信息,例如点击品类的id等。
+
+ List> visitCategoryList = new ArrayList>();
+ // 创建一个名为visitCategoryList的ArrayList列表,用于存放即将提取出来的与访问品类相关的键值对信息,
+ // 其键和值的类型都为Long,后续会根据不同的业务场景(点击、下单、支付)把相应的品类ID信息封装成键值对添加到这个列表中。
+
+ Long clickCategoryId = row.getLong(6);
+ // 从row数据行中获取索引为6的字段值(从代码上下文推测这个字段应该代表点击品类的ID),并将其转换为Long类型,赋值给clickCategoryId变量。
+ // 通过索引获取字段的方式表明Row对象内部的数据结构类似数组或者列表,每个索引位置对应不同的业务含义的字段,具体需要结合数据来源和定义来确定。
+
+ // 点击品类的id
+ if (clickCategoryId!= null)
+ visitCategoryList.add(new Tuple2(clickCategoryId, clickCategoryId));
+ // 判断点击品类的ID是否为空,如果不为空,说明存在点击品类相关的信息,
+ // 那么就创建一个键值对,其键和值都设置为该点击品类的ID(clickCategoryId),然后将这个键值对添加到visitCategoryList列表中,
+ // 这样做方便后续统计点击品类相关的情况以及和其他品类相关操作(如下单、支付品类信息)进行整合处理。
+
+ if (row.get(8)!= null) {
+ String[] orderCategoryIdsSplited = row.getString(8).split(",");
+ // 从row数据行中获取索引为8的字段(根据代码逻辑推测这个字段存储的是下单品类的ID集合,以逗号分隔的字符串形式存在),先判断其是否为空,
+ // 如果不为空,就通过split方法以逗号作为分隔符,将这个字符串拆分成一个字符串数组orderCategoryIdsSplited,数组中的每个元素就是一个下单品类的ID(以字符串形式呈现)。
+
+ for (String orderCategoryId :
orderCategoryIdsSplited) {
- visitCategoryList.add(new Tuple2(Long.valueOf(orderCategoryId),Long.valueOf(orderCategoryId)));
+ visitCategoryList.add(new Tuple2(Long.valueOf(orderCategoryId), Long.valueOf(orderCategoryId)));
+ // 遍历拆分后的下单品类ID字符串数组orderCategoryIdsSplited,对于其中的每个字符串形式的下单品类ID(orderCategoryId),
+ // 先将其转换为Long类型,然后创建一个键值对,键和值都设置为这个转换后的Long类型的下单品类ID,
+ // 最后将这个键值对添加到visitCategoryList列表中,用于记录下单的品类相关信息,方便后续统计和分析下单品类的情况。
}
}
- if(row.get(10)!=null){
- String[] payCategoryIdsSplited=row.getString(10).split(",");
- for (String payCategoryId:
+ if (row.get(10)!= null) {
+ String[] payCategoryIdsSplited = row.getString(10).split(",");
+ // 同样地,从row数据行中获取索引为10的字段(推测这个字段存储的是支付品类的ID集合,也是以逗号分隔的字符串形式),判断其是否为空,
+ // 若不为空,使用split方法以逗号作为分隔符将其拆分成字符串数组payCategoryIdsSplited,数组中的每个元素就是一个支付品类的ID(字符串形式)。
+
+ for (String payCategoryId :
payCategoryIdsSplited) {
- visitCategoryList.add(new Tuple2(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId)));
+ visitCategoryList.add(new Tuple2(Long.valueOf(payCategoryId), Long.valueOf(payCategoryId)));
+ // 遍历支付品类ID字符串数组payCategoryIdsSplited,针对每个字符串形式的支付品类ID(payCategoryId),
+ // 将其转换为Long类型后创建一个键值对,键和值都设置为这个转换后的Long类型的支付品类ID,
+ // 再把这个键值对添加到visitCategoryList列表中,以此记录支付的品类相关信息,便于后续对支付品类情况进行统计和处理。
}
}
return visitCategoryList;
+ // 将包含了点击、下单、支付等访问品类相关键值对信息的visitCategoryList列表返回,
+ // 这些键值对将会构成新的JavaPairRDD(也就是前面定义的categoryRDD)中的元素,用于后续进一步的数据处理操作,比如去重、统计等。
}
});
- //需要去重
- categoryRDD=categoryRDD.distinct();
- //3。计算各个品类的点击,下单和支付次数
- // 3.1 计算点击品类的数量
- JavaPairRDD clickCategoryRDD = getLClickCategoryRDD(sessionId2DetailRDD);
-
- // 3.2 计算下单的品类的数量
- JavaPairRDD orderCategoryRDD= getOrderCategoryRDD(sessionId2DetailRDD);
- // 3.3 计算支付的品类的数量
- JavaPairRDD payCategoryRDD=getPayCategoryRDD(sessionId2DetailRDD);
- //4.将上述计算的三个字段进行join,注意这里是LeftOuterJoin,因为有些品类只是点击了
- JavaPairRDD categoryCountRDD=joinCategoryAndData(categoryRDD,clickCategoryRDD,orderCategoryRDD,payCategoryRDD);
- //5.自定义二次排序的key
- JavaPairRDD sortKeyCountRDD=categoryCountRDD.mapToPair(new PairFunction, CategorySortKey, String>() {
+ // 需要去重
+ categoryRDD = categoryRDD.distinct();
+// 调用categoryRDD的distinct方法对其进行去重操作。因为之前通过flatMapToPair操作提取品类ID信息时,
+// 可能存在重复的品类ID记录,去重可以保证后续基于品类ID进行的各种统计、计算及分析等操作数据的准确性,
+// 经过去重后categoryRDD中每个品类ID将是唯一的,方便后续处理步骤使用。
+
+// 3。计算各个品类的点击,下单和支付次数
+// 3.1 计算点击品类的数量
+ JavaPairRDD clickCategoryRDD = getLClickCategoryRDD(sessionId2DetailRDD);
+// 调用名为getLClickCategoryRDD的自定义方法,传入sessionId2DetailRDD作为参数,
+// 该方法的作用应该是从sessionId2DetailRDD这个包含详细会话数据的RDD中提取出与点击品类数量相关的信息,
+// 并返回一个JavaPairRDD类型的RDD(即clickCategoryRDD),其中键值对可能表示品类ID以及对应的点击次数等信息,
+// 用于后续和其他品类相关统计数据进行整合分析。
+
+// 3.2 计算下单的品类的数量
+ JavaPairRDD orderCategoryRDD = getOrderCategoryRDD(sessionId2DetailRDD);
+// 同样地,调用自定义的getOrderCategoryRDD方法,以sessionId2DetailRDD为参数,
+// 此方法旨在从给定的RDD中提取出与下单品类数量相关的数据,返回一个JavaPairRDD类型的orderCategoryRDD,
+// 其键值对大概包含了品类ID以及对应的下单次数等内容,方便后续综合统计各个品类的不同行为次数情况。
+
+// 3.3 计算支付的品类的数量
+ JavaPairRDD payCategoryRDD = getPayCategoryRDD(sessionId2DetailRDD);
+// 调用getPayCategoryRDD这个自定义方法,输入sessionId2DetailRDD,
+// 该方法负责从传入的RDD中提取出与支付品类数量相关的信息,生成并返回JavaPairRDD类型的payCategoryRDD,
+// 其中的键值对可能体现了品类ID和对应的支付次数等信息,为后续全面分析品类相关行为数据做准备。
+
+// 4.将上述计算的三个字段进行join,注意这里是LeftOuterJoin,因为有些品类只是点击了
+ JavaPairRDD categoryCountRDD = joinCategoryAndData(categoryRDD, clickCategoryRDD, orderCategoryRDD, payCategoryRDD);
+// 调用自定义的joinCategoryAndData方法,将前面已经获取到的categoryRDD(经过去重后包含品类ID的RDD)、
+// clickCategoryRDD(点击品类数量相关的RDD)、orderCategoryRDD(下单品类数量相关的RDD)以及payCategoryRDD(支付品类数量相关的RDD)作为参数传入,
+// 进行LeftOuterJoin(左外连接)操作。左外连接的意义在于,即使某些品类只有点击行为(也就是在下单和支付相关的RDD中不存在对应记录),
+// 也能在连接结果中保留这些品类的信息,确保最终得到的categoryCountRDD包含了所有品类的综合情况(点击、下单、支付次数等信息整合在一起),
+// 其返回的JavaPairRDD类型的categoryCountRDD,键可能是品类ID,值可能是整合了点击、下单、支付次数等相关信息的字符串,方便后续进一步处理。
+
+// 5.自定义二次排序的key
+ JavaPairRDD sortKeyCountRDD = categoryCountRDD.mapToPair(new PairFunction, CategorySortKey, String>() {
+ // 调用categoryCountRDD的mapToPair方法,传入一个实现了PairFunction接口的匿名内部类实例,
+ // 这个操作的目的是基于categoryCountRDD中的元素(类型为Tuple2),按照自定义的逻辑转换生成新的键值对,
+ // 新键值对的类型是Tuple2,也就是即将生成的sortKeyCountRDD的元素类型,通过自定义键的规则来为后续排序做准备。
@Override
public Tuple2 call(Tuple2 longStringTuple2) throws Exception {
- String countInfo=longStringTuple2._2;
- Long clickCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CLICK_CATEGORY));
- Long orderCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Long payCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- CategorySortKey key=new CategorySortKey();
- key.set(clickCount,orderCount,payCount);
- return new Tuple2(key,countInfo);
+ // 重写了PairFunction接口中的call方法,当执行mapToPair操作时,会针对categoryCountRDD中的每一个元素(每个Tuple2类型的键值对),
+ // 调用这个call方法来执行具体的转换逻辑,参数longStringTuple2就是当前正在处理的那个键值对元素。
+
+ String countInfo = longStringTuple2._2;
+ // 从传入的键值对longStringTuple2中获取其值部分(是一个字符串类型的数据,从变量名countInfo推测它包含了品类相关的各种统计信息,比如点击、下单、支付次数等),
+ // 并将其赋值给countInfo变量,后续将从这个字符串中提取出具体的次数信息用于构建排序的关键对象。
+
+ Long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_CATEGORY));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从countInfo字符串中提取出对应Constants.FIELD_CLICK_CATEGORY(应该是在Constants类中定义好的常量,
+ // 用于标识点击品类次数在字符串中的位置等信息)这个字段对应的内容,然后将其转换为Long类型,赋值给clickCount变量,从而获取到点击品类的次数信息。
+
+ Long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 同样利用StringUtils工具类,从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容,将其转换为Long类型,
+ // 赋值给orderCount变量,以此获取下单品类的次数信息(此处代码中两次使用了Constants.FIELD_ORDER_CATEGORY来提取字段,可能存在错误,
+ // 正常应该分别对应不同的常量来准确提取下单次数和支付次数,需结合实际Constants类定义来确认)。
+
+ Long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 再次使用相同方式从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容并转换为Long类型,赋值给payCount变量,
+ // 获取支付品类的次数信息(同样可能存在提取字段常量使用不当的问题,需检查修正)。
+
+ CategorySortKey key = new CategorySortKey();
+ key.set(clickCount, orderCount, payCount);
+ // 创建一个CategorySortKey类型的对象key,此类应该是自定义的用于封装排序关键信息的实体类,
+ // 通过调用其set方法(假设CategorySortKey类有这样的方法用于设置对象属性值),将前面获取到的点击品类次数、下单品类次数、支付品类次数等信息设置到key对象中,
+ // 以此构建出用于排序的关键对象,后续可以根据这个对象中包含的品类次数信息来决定数据的排序规则。
+
+ return new Tuple2(key, countInfo);
+ // 返回一个新的键值对,其键为构建好的CategorySortKey类型的排序关键对象key,值为原来的countInfo字符串(包含完整的品类统计信息),
+ // 这些新生成的键值对将构成新的JavaPairRDD(即前面定义的sortKeyCountRDD),用于后续按照自定义规则进行排序操作。
}
});
- JavaPairRDD sortedCategoryRDD=sortKeyCountRDD.sortByKey(false);
- //取出前10个,写入数据库
- List> top10CategoryList=sortedCategoryRDD.take(10);
- List top10Categories=new ArrayList();
- for(Tuple2 tuple2:top10CategoryList)
- {
- String countInfo=tuple2._2;
- Long categoryId=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CATEGORY_ID));
- Long clickCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_CLICK_CATEGORY));
- Long orderCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Long payCount=Long.valueOf(StringUtils.getFieldFromConcatString(countInfo,"\\|",Constants.FIELD_ORDER_CATEGORY));
- Top10Category top10Category=new Top10Category();
- top10Category.set(taskId,categoryId,clickCount,orderCount,payCount);
+
+
+
+ JavaPairRDD sortedCategoryRDD = sortKeyCountRDD.sortByKey(false);
+// 调用sortKeyCountRDD的sortByKey方法,传入参数false,表示按照键(CategorySortKey类型)进行降序排序。
+// 这里的键(CategorySortKey)包含了之前构建的与品类相关的点击、下单、支付次数等用于排序的关键信息,
+// 通过这种排序方式,可以将品类数据按照设定的规则有序排列,使得访问量等综合情况更突出的品类排在前面,方便后续取出排名靠前的品类信息,
+// 排序后的结果存储在新的JavaPairRDD类型的sortedCategoryRDD中。
+
+// 取出前10个,写入数据库
+ List> top10CategoryList = sortedCategoryRDD.take(10);
+// 调用sortedCategoryRDD的take方法,该方法会从已经排好序的sortedCategoryRDD中取出前10个元素,
+// 每个元素是一个Tuple2类型的键值对,包含了品类相关的排序关键信息(键部分)以及完整的品类统计信息(值部分),
+// 取出的这10个元素组成的列表被赋值给top10CategoryList变量,这个列表就代表了访问量等综合情况排名前10的品类相关信息,后续将基于这些信息进行数据库持久化等操作。
+
+ List top10Categories = new ArrayList();
+// 创建一个名为top10Categories的ArrayList列表,用于存放Top10Category类型的对象。
+// Top10Category应该是自定义的用于封装要持久化到数据库的前10品类详细信息的实体类,接下来的循环操作会将提取出来的相关数据封装成此类对象添加到这个列表中,
+// 以便后续进行批量插入数据库的操作。
+
+ for (Tuple2 tuple2 : top10CategoryList) {
+ // 开始遍历top10CategoryList列表,其中每个元素tuple2就是包含了排名前10品类相关信息的键值对,通过遍历可以依次提取每个品类的详细信息进行封装处理。
+ String countInfo = tuple2._2;
+ // 从当前遍历的键值对tuple2中获取其值部分(也就是包含品类详细统计信息的字符串,从前面代码逻辑可知这个字符串包含了品类ID、点击次数等多种信息),
+ // 并将其赋值给countInfo变量,后续将从这个字符串中按照特定分隔符提取出具体的各项信息,用于构建Top10Category对象。
+
+ Long categoryId = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CATEGORY_ID));
+ // 通过StringUtils工具类(自定义的用于字符串操作的工具类)的getFieldFromConcatString方法,
+ // 按照“|”作为分隔符从countInfo字符串中提取出对应Constants.FIELD_CATEGORY_ID(应该是在Constants类中定义好的常量,用于标识品类ID在字符串中的位置等信息)
+ // 这个字段对应的内容,然后将其转换为Long类型,赋值给categoryId变量,从而获取到当前品类的ID信息。
+
+ Long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_CLICK_CATEGORY));
+ // 同样利用StringUtils工具类,从countInfo字符串中提取对应Constants.FIELD_CLICK_CATEGORY字段对应的内容,并转换为Long类型,
+ // 赋值给clickCount变量,以此获取该品类的点击次数信息,方便后续将这些准确的业务数据封装到对象中持久化到数据库。
+
+ Long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容并转换为Long类型,赋值给orderCount变量,
+ // 这样就获取到了该品类的下单次数信息,用于构建完整的Top10Category对象,记录品类的下单行为情况。
+
+ Long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(countInfo, "\\|", Constants.FIELD_ORDER_CATEGORY));
+ // 再次使用StringUtils工具类按照相同分隔符从countInfo字符串中提取对应Constants.FIELD_ORDER_CATEGORY字段对应的内容,将其转换为Long类型,
+ // 赋值给payCount变量,获取该品类的支付次数信息(此处代码中两次使用Constants.FIELD_ORDER_CATEGORY来提取字段,可能存在错误,
+ // 正常应该分别对应不同的常量来准确提取下单次数和支付次数,需结合实际Constants类定义来确认)。
+
+ Top10Category top10Category = new Top10Category();
+ // 创建一个Top10Category类型的对象top10Category,此类是用于封装前10品类详细数据的自定义实体类,接下来将把提取到的各项品类信息设置到这个对象中。
+
+ top10Category.set(taskId, categoryId, clickCount, orderCount, payCount);
+ // 调用top10Category对象的set方法(假设Top10Category类有此方法用于设置对象的各个属性值),
+ // 将外部传入的任务ID(taskId)以及前面提取出来的品类ID、点击次数、下单次数、支付次数等信息设置到top10Category对象中,
+ // 完成对该对象的属性赋值操作,使其封装好了要持久化到数据库的前10品类的详细数据信息。
+
top10Categories.add(top10Category);
+ // 将封装好数据的top10Category对象添加到top10Categories列表中,通过循环不断添加,最终这个列表将包含所有排名前10品类的完整封装对象,
+ // 准备进行批量插入数据库的操作。
}
- //插入数据库
+
+// 插入数据库
DaoFactory.getTop10CategoryDao().batchInsert(top10Categories);
+// 通过DaoFactory(应该是自定义的数据访问工厂类,用于获取数据库操作相关的DAO对象)的getTop10CategoryDao方法,
+// 获取到用于操作Top10Category数据的DAO对象,然后调用其batchInsert方法,
+// 将包含要持久化数据的top10Categories列表传入,实现将排名前10品类的详细信息批量插入到数据库中的功能,完成数据持久化的操作流程。
+
return top10CategoryList;
- }
+// 最后将包含排名前10品类相关信息(以Tuple2类型的键值对形式存在)的top10CategoryList列表返回,
+// 方便外部代码根据需要进一步使用这些数据,比如在其他地方展示、进行关联分析等操作。
- /**
- * 将几个品类相连接
- * @param categoryRDD
- * @param clickCategoryRDD
- * @param orderCategoryRDD
- * @param payCategoryRDD
- * @return
- */
- private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) {
- JavaPairRDD>> tmpJoinRDD=categoryRDD.leftOuterJoin(clickCategoryRDD);
+ /**
+ * 将几个品类相连接
+ * @param categoryRDD
+ * @param clickCategoryRDD
+ * @param orderCategoryRDD
+ * @param payCategoryRDD
+ * @return
+ */
+ private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) {
+ // 此方法名为joinCategoryAndData,作用是将不同来源的品类相关数据(品类ID以及对应的点击、下单、支付次数等信息)进行关联整合,
+ // 接收四个JavaPairRDD类型的参数,分别是categoryRDD(包含品类ID信息)、clickCategoryRDD(点击品类次数相关信息)、
+ // orderCategoryRDD(下单品类次数相关信息)以及payCategoryRDD(支付品类次数相关信息),最终返回一个整合好数据的JavaPairRDD类型的结果。
+
+ JavaPairRDD>> tmpJoinRDD = categoryRDD.leftOuterJoin(clickCategoryRDD);
+ // 首先对categoryRDD和clickCategoryRDD进行左外连接(leftOuterJoin)操作。左外连接的特点是,以categoryRDD中的元素为基础,
+ // 对于categoryRDD中的每个品类ID,尝试在clickCategoryRDD中查找对应的点击次数信息,如果能找到,则将品类ID与对应的点击次数信息组合在一起;
+ // 如果在clickCategoryRDD中找不到对应的点击次数(即该品类可能没有点击行为),则点击次数部分会用com.google.common.base.Optional类型表示,
+ // 其中Optional用于处理可能存在或不存在的值的情况(在这里表示可能不存在点击次数的情况)。连接后的结果是一个新的JavaPairRDD,
+ // 其键依然是品类ID(Long类型),值是一个包含两个元素的Tuple2,第一个元素是品类ID(与键相同,重复出现,方便后续处理),
+ // 第二个元素是Optional类型,表示对应的点击次数(可能存在也可能不存在),这个新的RDD被赋值给tmpJoinRDD变量,用于后续进一步处理。
+
+ JavaPairRDD tmpRDD = tmpJoinRDD.mapToPair(new PairFunction>>, Long, String>() {
+ // 调用tmpJoinRDD的mapToPair方法,传入一个实现了PairFunction接口的匿名内部类实例,目的是基于tmpJoinRDD中的元素(类型为Tuple2>>),
+ // 按照自定义的逻辑转换生成新的键值对,新键值对的类型是Tuple2,也就是即将生成的tmpRDD的元素类型,这里主要是将前面连接后的复杂数据结构进行整理,提取关键信息并转换格式。
+
+ @Override
+ public Tuple2