|
|
|
@ -1,55 +1,249 @@
|
|
|
|
|
//这行代码定义了当前类所在的包名。包名用于组织和管理类。
|
|
|
|
|
package cn.edu.hust.session;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.conf 包下的 ConfigurationManager 类。这个类可能用于管理配置信息
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.conf.ConfigurationManager;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.constant 包下的 Constants 类。这个类可能包含一些常量定义。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.constant.Constants;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.dao 包下的 TaskDao 类。这个类可能用于与任务相关的数据访问操作。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.dao.TaskDao;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.dao.factory 包下的 DaoFactory 类。这个类可能用于创建 DAO 对象。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.dao.factory.DaoFactory;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.domain 包下的所有类。这个包可能包含一些数据模型类。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.domain.*;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.mockData 包下的 MockData 类。这个类可能用于生成模拟数据
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.mockData.MockData;
|
|
|
|
|
|
|
|
|
|
//导入 cn.edu.hust.util 包下的所有类。这个包可能包含一些通用工具类。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.util.*;
|
|
|
|
|
|
|
|
|
|
//导入 com.alibaba.fastjson.JSONObject 类。FastJSON 是一个用于处理 JSON 数据的库。
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.Accumulator 类。这个类用于在 Spark 作业中共享和累加数据。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.Accumulator;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.SparkConf 类。这个类用于配置 Spark 应用程序。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.SparkContext 类。这个类是 Spark 应用程序的主入口点。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.SparkContext;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.api.java.JavaPairRDD 类。
|
|
|
|
|
// 这个类是用于处理键值对的 RDD(弹性分布式数据集)
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.api.java.JavaPairRDD;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.api.java.JavaRDD 类。这个类是用于处理普通 RDD 的 Java API。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.api.java.JavaRDD;
|
|
|
|
|
|
|
|
|
|
//导入了 JavaSparkContext 类。
|
|
|
|
|
// JavaSparkContext 是 org.apache.spark.api.java 包下的一个类,提供了与 Spark 集群交互的 Java API。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.api.java.function 包下的所有函数类。
|
|
|
|
|
// 这些类提供了常见的操作函数,如 FlatMapFunction, MapFunction, FilterFunction 等。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.api.java.function.*;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.sql.DataFrame 类。这个类是用于操作结构化数据的。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.sql.DataFrame;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.sql.Row 类。这个类是用于表示一行数据的。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.sql.Row;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.sql.SQLContext 类。
|
|
|
|
|
// 这个类用于创建 Spark SQL 的上下文环境
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.sql.SQLContext;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.sql.hive.HiveContext 类。
|
|
|
|
|
//这个类用于与 Hive 兼容的 Spark SQL 上下文环境
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.sql.hive.HiveContext;
|
|
|
|
|
|
|
|
|
|
//导入 org.apache.spark.storage.StorageLevel 类。
|
|
|
|
|
// 这个类定义了 Spark 中数据存储的级别。
|
|
|
|
|
|
|
|
|
|
import org.apache.spark.storage.StorageLevel;
|
|
|
|
|
|
|
|
|
|
//导入 scala.Tuple2 类。这个类用于表示一个二元组(一个包含两个元素的元组)。
|
|
|
|
|
|
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
|
|
import java.util.*;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 用户可以查询的范围包含
|
|
|
|
|
* 1。用户的职业
|
|
|
|
|
* 2。用户的性别
|
|
|
|
|
* 3。用户城市
|
|
|
|
|
* 4。用户年龄
|
|
|
|
|
* 5。获取搜索词
|
|
|
|
|
* 6。获取点击品类
|
|
|
|
|
*/
|
|
|
|
|
public class UserVisitAnalyze {
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
args = new String[]{"1"};
|
|
|
|
|
SparkConf conf = new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
|
|
|
|
|
JavaSparkContext context = new JavaSparkContext(conf);
|
|
|
|
|
SQLContext sc = getSQLContext(context.sc());
|
|
|
|
|
mock(context, sc);
|
|
|
|
|
TaskDao dao = DaoFactory.getTaskDao();
|
|
|
|
|
Long taskId = ParamUtils.getTaskIdFromArgs(args);
|
|
|
|
|
Task task = dao.findTaskById(taskId);
|
|
|
|
|
JSONObject jsonObject = JSONObject.parseObject(task.getTaskParam());
|
|
|
|
|
JavaRDD<Row> sessionRangeDate = getActionRDD(sc, jsonObject);
|
|
|
|
|
JavaPairRDD<String, Row> sessionInfoPairRDD = getSessonInfoPairRDD(sessionRangeDate);
|
|
|
|
|
public static void main(String[] args)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
//初始化 args 参数数组,用于传递给 main 方法。
|
|
|
|
|
// 这里暂时设置为 {"1"},实际使用时可以根据需要调整。
|
|
|
|
|
args=new String[]{"1"};
|
|
|
|
|
/**
|
|
|
|
|
* 构建spark上下文
|
|
|
|
|
*/
|
|
|
|
|
//创建 SparkConf 对象并设置应用程序名称和运行模式(本地模式,使用3个核心)。
|
|
|
|
|
//使用 SparkConf 创建 JavaSparkContext 实例,初始化Spark上下文。
|
|
|
|
|
SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
|
|
|
|
|
JavaSparkContext context=new JavaSparkContext(conf);
|
|
|
|
|
|
|
|
|
|
//通过 JavaSparkContext 获取 SQLContext,用于执行SQL查询。
|
|
|
|
|
SQLContext sc=getSQLContext(context.sc());
|
|
|
|
|
//生成模拟数据,调用 mock 方法生成模拟数据到Spark环境中。
|
|
|
|
|
mock(context,sc);
|
|
|
|
|
|
|
|
|
|
//拿到相应的Dao组建
|
|
|
|
|
//通过 DaoFactory 获取 TaskDao 实例,用于数据库操作。
|
|
|
|
|
TaskDao dao= DaoFactory.getTaskDao();
|
|
|
|
|
|
|
|
|
|
//从外部传入的参数获取任务的id
|
|
|
|
|
//通过 args 参数获取任务ID。
|
|
|
|
|
Long taskId=ParamUtils.getTaskIdFromArgs(args);
|
|
|
|
|
|
|
|
|
|
//从数据库中查询出相应的task
|
|
|
|
|
//通过任务ID从数据库中查询任务信息,并将任务参数解析为 JSONObject
|
|
|
|
|
Task task=dao.findTaskById(taskId);
|
|
|
|
|
JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
|
|
|
|
|
|
|
|
|
|
//获取指定范围内的Sesssion
|
|
|
|
|
//调用 getActionRDD 方法获取包含指定范围内的Session的RDD。
|
|
|
|
|
JavaRDD<Row> sessionRangeDate=getActionRDD(sc,jsonObject);
|
|
|
|
|
|
|
|
|
|
//这里增加一个新的方法,主要是映射
|
|
|
|
|
//将 sessionRangeDate 转换为包含键值对的 PairRDD。
|
|
|
|
|
JavaPairRDD<String,Row> sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate);
|
|
|
|
|
|
|
|
|
|
//重复用到的RDD进行持久化
|
|
|
|
|
//将 sessionInfoPairRDD 持久化到磁盘,提高后续操作的性能
|
|
|
|
|
sessionInfoPairRDD.persist(StorageLevel.DISK_ONLY());
|
|
|
|
|
JavaPairRDD<String, String> sesssionAggregateInfoRDD = aggregateBySessionId(sc, sessionInfoPairRDD);
|
|
|
|
|
Accumulator<String> sessionAggrStatAccumulator = context.accumulator("", new SessionAggrStatAccumulator());
|
|
|
|
|
JavaPairRDD<String, String> filteredSessionRDD = filterSessionAndAggrStat(sesssionAggregateInfoRDD, jsonObject, sessionAggrStatAccumulator);
|
|
|
|
|
|
|
|
|
|
//按照Sesson进行聚合
|
|
|
|
|
//调用 aggregateBySessionId 方法对Session信息进行聚合。
|
|
|
|
|
JavaPairRDD<String,String> sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionInfoPairRDD);
|
|
|
|
|
|
|
|
|
|
//通过条件对RDD进行筛选
|
|
|
|
|
//使用Accumulator进行统计:
|
|
|
|
|
//创建一个 Accumulator 来统计聚合结果。
|
|
|
|
|
Accumulator<String> sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
|
|
|
|
|
|
|
|
|
|
//过滤和统计Session信息:
|
|
|
|
|
//在进行accumulator之前,需要aciton动作,不然会为空
|
|
|
|
|
//调用 filterSessionAndAggrStat 方法过滤并统计Session信息。
|
|
|
|
|
JavaPairRDD<String,String> filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
|
|
|
|
|
|
|
|
|
|
//重复用到的RDD进行持久化
|
|
|
|
|
//将过滤后的RDD持久化到磁盘。
|
|
|
|
|
filteredSessionRDD.persist(StorageLevel.DISK_ONLY());
|
|
|
|
|
JavaPairRDD<String, Row> commonFullClickInfoRDD = getFilterFullInfoRDD(filteredSessionRDD, sessionInfoPairRDD);
|
|
|
|
|
|
|
|
|
|
//获取符合过滤条件的全信息公共RDD
|
|
|
|
|
//调用 getFilterFullInfoRDD 方法获取包含完整信息的公共RDD。
|
|
|
|
|
JavaPairRDD<String, Row> commonFullClickInfoRDD=getFilterFullInfoRDD(filteredSessionRDD,sessionInfoPairRDD);
|
|
|
|
|
|
|
|
|
|
//重复用到的RDD进行持久化
|
|
|
|
|
//将公共RDD持久化到磁盘。
|
|
|
|
|
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
|
|
|
|
|
randomExtractSession(taskId, filteredSessionRDD, sessionInfoPairRDD);
|
|
|
|
|
calculateAndPersist(sessionAggrStatAccumulator.value(), taskId);
|
|
|
|
|
List<Tuple2<CategorySortKey, String>> top10CategoryIds = getTop10Category(taskId, commonFullClickInfoRDD);
|
|
|
|
|
getTop10Session(context, taskId, sessionInfoPairRDD, top10CategoryIds);
|
|
|
|
|
//session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 重构实现的思路:
|
|
|
|
|
* 1。不要去生成任何的新RDD
|
|
|
|
|
*
|
|
|
|
|
* 2。不要去单独遍历一遍sesion的数据
|
|
|
|
|
*
|
|
|
|
|
* 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
|
|
|
|
|
*
|
|
|
|
|
* 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
|
|
|
|
|
*
|
|
|
|
|
* 开发Spark的经验准则
|
|
|
|
|
*
|
|
|
|
|
* 1。尽量少生成RDD
|
|
|
|
|
*
|
|
|
|
|
* 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
|
|
|
|
|
*
|
|
|
|
|
* 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
|
|
|
|
|
*
|
|
|
|
|
* shuffle操作,会导致大量的磁盘读写,严重降低性能
|
|
|
|
|
* 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
|
|
|
|
|
* 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
|
|
|
|
|
*
|
|
|
|
|
* 4。无论做什么功能,性能第一
|
|
|
|
|
*
|
|
|
|
|
* 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
|
|
|
|
|
* 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
|
|
|
|
|
* 一场灾难。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 使用CountByKey算子实现随机抽取功能
|
|
|
|
|
*/
|
|
|
|
|
//调用 randomExtractSession 方法实现随机抽取功能。
|
|
|
|
|
randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
|
|
|
|
|
|
|
|
|
|
//在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算
|
|
|
|
|
//filteredSessionRDD.count();
|
|
|
|
|
//计算各个session占比,并写入MySQL
|
|
|
|
|
|
|
|
|
|
//调用 calculateAndPersist 方法计算并持久化聚合统计结果。
|
|
|
|
|
calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
|
|
|
|
|
|
|
|
|
|
//调用 getTop10Category 方法获取热门品类数据Top10
|
|
|
|
|
List<Tuple2<CategorySortKey,String>> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
|
|
|
|
|
|
|
|
|
|
//调用 getTop10Session 方法获取热门品类点击Top10Session。
|
|
|
|
|
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
|
|
|
|
|
|
|
|
|
|
//关闭Spark上下文,释放资源
|
|
|
|
|
context.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 功能总结
|
|
|
|
|
* 配置和初始化:配置Spark环境并初始化Spark上下文。
|
|
|
|
|
* 生成模拟数据:生成模拟数据到Spark环境中。
|
|
|
|
|
* 数据库操作:从数据库中获取任务信息。
|
|
|
|
|
* 数据处理:读取和处理Session数据,进行聚合、过滤、统计等操作。
|
|
|
|
|
* 持久化:将中间结果持久化以提高性能。
|
|
|
|
|
* 统计和汇总:计算并汇总统计数据。
|
|
|
|
|
* 随机抽取和输出:实现随机抽取功能并输出结果。
|
|
|
|
|
* 关闭资源:关闭Spark上下文,释放资源。
|
|
|
|
|
* 通过这些步骤,可以完成用户访问分析的整个流程。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|