|
|
|
@ -174,42 +174,79 @@ public class UserVisitAnalyze {
|
|
|
|
|
commonFullClickInfoRDD.persist(StorageLevel.DISK_ONLY());
|
|
|
|
|
//session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 重构实现的思路:
|
|
|
|
|
* 1。不要去生成任何的新RDD
|
|
|
|
|
*
|
|
|
|
|
* 2。不要去单独遍历一遍sesion的数据
|
|
|
|
|
*
|
|
|
|
|
* 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
|
|
|
|
|
*
|
|
|
|
|
* 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
|
|
|
|
|
*
|
|
|
|
|
* 开发Spark的经验准则
|
|
|
|
|
*
|
|
|
|
|
* 1。尽量少生成RDD
|
|
|
|
|
*
|
|
|
|
|
* 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
|
|
|
|
|
*
|
|
|
|
|
* 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
|
|
|
|
|
*
|
|
|
|
|
* shuffle操作,会导致大量的磁盘读写,严重降低性能
|
|
|
|
|
* 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
|
|
|
|
|
* 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
|
|
|
|
|
*
|
|
|
|
|
* 4。无论做什么功能,性能第一
|
|
|
|
|
*
|
|
|
|
|
* 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
|
|
|
|
|
* 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
|
|
|
|
|
* 一场灾难。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 使用CountByKey算子实现随机抽取功能
|
|
|
|
|
*/
|
|
|
|
|
//调用 randomExtractSession 方法实现随机抽取功能。
|
|
|
|
|
randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD);
|
|
|
|
|
|
|
|
|
|
//在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算
|
|
|
|
|
//filteredSessionRDD.count();
|
|
|
|
|
//计算各个session占比,并写入MySQL
|
|
|
|
|
|
|
|
|
|
//调用 calculateAndPersist 方法计算并持久化聚合统计结果。
|
|
|
|
|
calculateAndPersist(sessionAggrStatAccumulator.value(),taskId);
|
|
|
|
|
//获取热门品类数据Top10
|
|
|
|
|
|
|
|
|
|
//调用 getTop10Category 方法获取热门品类数据Top10
|
|
|
|
|
List<Tuple2<CategorySortKey,String>> top10CategoryIds=getTop10Category(taskId,commonFullClickInfoRDD);
|
|
|
|
|
//获取热门每一个品类点击Top10session
|
|
|
|
|
|
|
|
|
|
//调用 getTop10Session 方法获取热门品类点击Top10Session。
|
|
|
|
|
getTop10Session(context,taskId,sessionInfoPairRDD,top10CategoryIds);
|
|
|
|
|
//关闭spark上下文
|
|
|
|
|
|
|
|
|
|
//关闭Spark上下文,释放资源
|
|
|
|
|
context.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 功能总结
|
|
|
|
|
* 配置和初始化:配置Spark环境并初始化Spark上下文。
|
|
|
|
|
* 生成模拟数据:生成模拟数据到Spark环境中。
|
|
|
|
|
* 数据库操作:从数据库中获取任务信息。
|
|
|
|
|
* 数据处理:读取和处理Session数据,进行聚合、过滤、统计等操作。
|
|
|
|
|
* 持久化:将中间结果持久化以提高性能。
|
|
|
|
|
* 统计和汇总:计算并汇总统计数据。
|
|
|
|
|
* 随机抽取和输出:实现随机抽取功能并输出结果。
|
|
|
|
|
* 关闭资源:关闭Spark上下文,释放资源。
|
|
|
|
|
* 通过这些步骤,可以完成用户访问分析的整个流程。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -218,25 +255,67 @@ public class UserVisitAnalyze {
|
|
|
|
|
* @param sc
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
//getSQLContext 方法
|
|
|
|
|
public static SQLContext getSQLContext(SparkContext sc)
|
|
|
|
|
{
|
|
|
|
|
//通过 ConfigurationManager 获取配置项 SPARK_LOCAL,判断是否运行在本地模式下。
|
|
|
|
|
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
|
|
|
|
|
|
|
|
|
|
//判断是否为本地模式:
|
|
|
|
|
//如果是本地模式,则返回一个新的 SQLContext 实例。
|
|
|
|
|
// SQLContext 用于执行传统的SQL查询。
|
|
|
|
|
if(local)
|
|
|
|
|
{
|
|
|
|
|
return new SQLContext(sc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//返回HiveContext:
|
|
|
|
|
//如果不是本地模式,则返回一个新的 HiveContext 实例。
|
|
|
|
|
// HiveContext 用于与Hive集成,执行Hive查询。
|
|
|
|
|
return new HiveContext(sc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 功能解释
|
|
|
|
|
* 本地模式与Hive模式:此方法根据配置项 SPARK_LOCAL 的值来决定返回 SQLContext 还是 HiveContext。SPARK_LOCAL 为 true 时,
|
|
|
|
|
* 返回 SQLContext,用于执行传统的SQL查询;
|
|
|
|
|
* SPARK_LOCAL 为 false 时,返回 HiveContext,用于与Hive集成,执行Hive查询。
|
|
|
|
|
* 配置管理:使用 ConfigurationManager 来获取配置项,
|
|
|
|
|
* 确保配置项的灵活性和可维护性。
|
|
|
|
|
* ConfigurationManager.getBoolean 方法可以方便地获取布尔类型的配置项。
|
|
|
|
|
* @param context
|
|
|
|
|
* @param sc
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//mock 方法
|
|
|
|
|
private static void mock(JavaSparkContext context,SQLContext sc)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
//获取配置项:
|
|
|
|
|
//通过 ConfigurationManager
|
|
|
|
|
// 获取配置项 SPARK_LOCAL,判断是否运行在本地模式下。
|
|
|
|
|
boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
|
|
|
|
|
|
|
|
|
|
//判断是否为本地模式:
|
|
|
|
|
//如果是本地模式,则调用 MockData.mock 方法生成模拟数据。
|
|
|
|
|
if(local)
|
|
|
|
|
{
|
|
|
|
|
MockData.mock(context,sc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 功能解释
|
|
|
|
|
* 模拟数据生成:此方法根据配置项 SPARK_LOCAL 的值来决定是否生成模拟数据。
|
|
|
|
|
* SPARK_LOCAL 为 true 时,调用 MockData.mock 方法生成模拟数据到Spark环境中。
|
|
|
|
|
* 模拟数据:MockData.mock 方法的具体实现细节没有显示,
|
|
|
|
|
* 但通常用于生成模拟的RDD或DataFrame,以便在开发或测试阶段使用。
|
|
|
|
|
* 这有助于避免实际数据处理过程中可能出现的问题,同时便于进行单元测试和调试。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取指定日期范围内的数据
|
|
|
|
|