From 0af0f9bd612db554bb03efbafbd84ac29c145464 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=8A=9D=E9=85=92=E5=8D=83=E7=99=BE?= <3489241516@qq.com>
Date: Mon, 16 Dec 2024 21:58:42 +0800
Subject: [PATCH] =?UTF-8?q?1.=E7=8E=AF=E5=A2=83=E6=90=AD=E5=BB=BA=E5=AE=8C?=
=?UTF-8?q?=E6=88=9011=202.=E7=9B=B8=E5=85=B3=E7=9A=84=E5=B7=A5=E5=85=B7?=
=?UTF-8?q?=E7=B1=BB=E7=BC=96=E5=86=99=E5=AE=8C=E6=88=9011=203.=E9=85=8D?=
=?UTF-8?q?=E7=BD=AE=E6=96=87=E4=BB=B6=E7=AE=A1=E7=90=86=E7=B1=BB=E7=BC=96?=
=?UTF-8?q?=E5=86=99=E5=AE=8C=E6=88=9011?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/workspace.xml | 16 ++-
.../cn/edu/hust/session/UserVisitAnalyze.java | 102 +++++++++++++++++-
2 files changed, 112 insertions(+), 6 deletions(-)
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 993cb77..492df0f 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,7 +4,10 @@
-
+
+
+
+
@@ -474,6 +477,7 @@
+
1529592741848
@@ -537,7 +541,15 @@
1734354900465
-
+
+
+ 1734354969705
+
+
+
+ 1734354969705
+
+
diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
index ae867a7..ff804ca 100644
--- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
+++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
@@ -1,25 +1,94 @@
+//这行代码定义了当前类所在的包名。包名用于组织和管理类。
package cn.edu.hust.session;
+//导入 cn.edu.hust.conf 包下的 ConfigurationManager 类。这个类可能用于管理配置信息
+
import cn.edu.hust.conf.ConfigurationManager;
+
+//导入 cn.edu.hust.constant 包下的 Constants 类。这个类可能包含一些常量定义。
+
import cn.edu.hust.constant.Constants;
+
+//导入 cn.edu.hust.dao 包下的 TaskDao 类。这个类可能用于与任务相关的数据访问操作。
+
import cn.edu.hust.dao.TaskDao;
+
+//导入 cn.edu.hust.dao.factory 包下的 DaoFactory 类。这个类可能用于创建 DAO 对象。
+
import cn.edu.hust.dao.factory.DaoFactory;
+
+//导入 cn.edu.hust.domain 包下的所有类。这个包可能包含一些数据模型类。
+
import cn.edu.hust.domain.*;
+
+//导入 cn.edu.hust.mockData 包下的 MockData 类。这个类可能用于生成模拟数据
+
import cn.edu.hust.mockData.MockData;
+
+//导入 cn.edu.hust.util 包下的所有类。这个包可能包含一些通用工具类。
+
import cn.edu.hust.util.*;
+
+//导入 com.alibaba.fastjson.JSONObject 类。FastJSON 是一个用于处理 JSON 数据的库。
+
import com.alibaba.fastjson.JSONObject;
+
+//导入 org.apache.spark.Accumulator 类。这个类用于在 Spark 作业中共享和累加数据。
+
import org.apache.spark.Accumulator;
+
+//导入 org.apache.spark.SparkConf 类。这个类用于配置 Spark 应用程序。
+
import org.apache.spark.SparkConf;
+
+//导入 org.apache.spark.SparkContext 类。这个类是 Spark 应用程序的主入口点。
+
import org.apache.spark.SparkContext;
+
+//导入 org.apache.spark.api.java.JavaPairRDD 类。
+// 这个类是用于处理键值对的 RDD(弹性分布式数据集)
+
import org.apache.spark.api.java.JavaPairRDD;
+
+//导入 org.apache.spark.api.java.JavaRDD 类。这个类是用于处理普通 RDD 的 Java API。
+
import org.apache.spark.api.java.JavaRDD;
+
+//导入了 JavaSparkContext 类。
+// JavaSparkContext 是 org.apache.spark.api.java 包下的一个类,提供了与 Spark 集群交互的 Java API。
+
import org.apache.spark.api.java.JavaSparkContext;
+
+//导入 org.apache.spark.api.java.function 包下的所有函数类。
+// 这些类提供了常见的操作函数,如 FlatMapFunction, MapFunction, FilterFunction 等。
+
import org.apache.spark.api.java.function.*;
+
+//导入 org.apache.spark.sql.DataFrame 类。这个类是用于操作结构化数据的。
+
import org.apache.spark.sql.DataFrame;
+
+//导入 org.apache.spark.sql.Row 类。这个类是用于表示一行数据的。
+
import org.apache.spark.sql.Row;
+
+//导入 org.apache.spark.sql.SQLContext 类。
+// 这个类用于创建 Spark SQL 的上下文环境
+
import org.apache.spark.sql.SQLContext;
+
+//导入 org.apache.spark.sql.hive.HiveContext 类。
+//这个类用于与 Hive 兼容的 Spark SQL 上下文环境
+
import org.apache.spark.sql.hive.HiveContext;
+
+//导入 org.apache.spark.storage.StorageLevel 类。
+// 这个类定义了 Spark 中数据存储的级别。
+
import org.apache.spark.storage.StorageLevel;
+
+//导入 scala.Tuple2 类。这个类用于表示一个二元组(一个包含两个元素的元组)。
+
import scala.Tuple2;
import java.util.*;
@@ -36,50 +105,75 @@ import java.util.*;
public class UserVisitAnalyze {
public static void main(String[] args)
{
+
+ //初始化 args 参数数组,用于传递给 main 方法。
+ // 这里暂时设置为 {"1"},实际使用时可以根据需要调整。
args=new String[]{"1"};
/**
* 构建spark上下文
*/
+ //创建 SparkConf 对象并设置应用程序名称和运行模式(本地模式,使用3个核心)。
+ //使用 SparkConf 创建 JavaSparkContext 实例,初始化Spark上下文。
SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
JavaSparkContext context=new JavaSparkContext(conf);
+
+ //通过 JavaSparkContext 获取 SQLContext,用于执行SQL查询。
SQLContext sc=getSQLContext(context.sc());
- //生成模拟数据
+ //生成模拟数据,调用 mock 方法生成模拟数据到Spark环境中。
mock(context,sc);
//拿到相应的Dao组建
+ //通过 DaoFactory 获取 TaskDao 实例,用于数据库操作。
TaskDao dao= DaoFactory.getTaskDao();
+
//从外部传入的参数获取任务的id
+ //通过 args 参数获取任务ID。
Long taskId=ParamUtils.getTaskIdFromArgs(args);
+
//从数据库中查询出相应的task
+ //通过任务ID从数据库中查询任务信息,并将任务参数解析为 JSONObject
Task task=dao.findTaskById(taskId);
JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
//获取指定范围内的Sesssion
+ //调用 getActionRDD 方法获取包含指定范围内的Session的RDD。
JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject);
//这里增加一个新的方法,主要是映射
+ //将 sessionRangeDate 转换为包含键值对的 PairRDD。
JavaPairRDD sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
+
//重复用到的RDD进行持久化
+ //将 sessionInfoPairRDD 持久化到磁盘,提高后续操作的性能
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
- //上面的两个RDD是
+
//按照Sesson进行聚合
+ //调用 aggregateBySessionId 方法对Session信息进行聚合。
JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
//通过条件对RDD进行筛选
- // 重构,同时统计
+ //使用Accumulator进行统计:
+ //创建一个 Accumulator 来统计聚合结果。
Accumulator sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
-
+ //过滤和统计Session信息:
//在进行accumulator之前,需要aciton动作,不然会为空
+ //调用 filterSessionAndAggrStat 方法过滤并统计Session信息。
JavaPairRDD filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
+
//重复用到的RDD进行持久化
+ //将过滤后的RDD持久化到磁盘。
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
+
//获取符合过滤条件的全信息公共RDD
+ //调用 getFilterFullInfoRDD 方法获取包含完整信息的公共RDD。
JavaPairRDD commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
//重复用到的RDD进行持久化
+ //将公共RDD持久化到磁盘。
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
//session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
+
/**
* 重构实现的思路:
* 1。不要去生成任何的新RDD