|
|
|
@ -323,27 +323,82 @@ public class UserVisitAnalyze {
|
|
|
|
|
* @param taskParam
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//getActionRDD 方法
|
|
|
|
|
private static JavaRDD<Row> getActionRDD(SQLContext sc, JSONObject taskParam)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
//获取开始时间和结束时间:
|
|
|
|
|
//从 taskParam 中获取开始时间 startTime 和结束时间 endTime。
|
|
|
|
|
// ParamUtils.getParam 方法用于从 JSONObject 中获取参数值。
|
|
|
|
|
String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
|
|
|
|
|
String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
|
|
|
|
|
|
|
|
|
|
//构建SQL查询语句
|
|
|
|
|
// 查询表 user_visit_action 中 date 字段在 startTime 和 endTime 之间的所有记录。
|
|
|
|
|
String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
|
|
|
|
|
|
|
|
|
|
//执行SQL查询并获取DataFrame:
|
|
|
|
|
//使用 SQLContext 执行SQL查询,获取查询结果的 DataFrame。
|
|
|
|
|
DataFrame df=sc.sql(sql);
|
|
|
|
|
|
|
|
|
|
//将DataFrame转换为JavaRDD<Row>
|
|
|
|
|
//方便后续的RDD操作。
|
|
|
|
|
return df.javaRDD();
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 功能解释
|
|
|
|
|
* SQL查询:通过SQL查询从数据库中获取特定时间段内的用户访问数据。
|
|
|
|
|
* 数据转换:将查询结果的 DataFrame 转换为 JavaRDD<Row>,以便进行后续的数据处理操作。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 将数据进行映射成为Pair,键为SessionId,Value为Row
|
|
|
|
|
* @param sessionRangeDate
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
private static JavaPairRDD<String,Row> getSessonInfoPairRDD(JavaRDD<Row> sessionRangeDate) {
|
|
|
|
|
return sessionRangeDate.mapToPair(new PairFunction<Row, String, Row>() {
|
|
|
|
|
//getSessonInfoPairRDD 方法
|
|
|
|
|
|
|
|
|
|
//定义方法签名:
|
|
|
|
|
//定义一个静态方法 getSessonInfoPairRDD,该方法接受一个 JavaRDD<Row> 类型的参数 sessionRangeDate,
|
|
|
|
|
// 并返回一个 JavaPairRDD<String, Row>。
|
|
|
|
|
private static JavaPairRDD<String,Row> getSessonInfoPairRDD(JavaRDD<Row> sessionRangeDate) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//使用 mapToPair 方法:
|
|
|
|
|
//使用 mapToPair 方法将 JavaRDD<Row> 转换为 JavaPairRDD<String, Row>。
|
|
|
|
|
// mapToPair 方法接受一个 PairFunction 实现类,
|
|
|
|
|
// 该实现类用于将每个 Row 对象转换为一个键值对 Tuple2<String, Row>。
|
|
|
|
|
return sessionRangeDate.mapToPair(new PairFunction<Row, String, Row>() {
|
|
|
|
|
@Override
|
|
|
|
|
//实现 PairFunction:
|
|
|
|
|
//实现 PairFunction 接口的 call 方法。
|
|
|
|
|
// call 方法会接受一个 Row 对象作为输入,并返回一个 Tuple2<String, Row>。
|
|
|
|
|
public Tuple2<String, Row> call(Row row) throws Exception {
|
|
|
|
|
|
|
|
|
|
//构造 Tuple2 对象:
|
|
|
|
|
//从 Row 对象中提取指定列的值(这里是第3列),
|
|
|
|
|
// 作为键 String,并将整个 Row 对象作为值 Row,返回一个 Tuple2<String, Row>。
|
|
|
|
|
return new Tuple2<String, Row>(row.getString(2),row);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
/**
|
|
|
|
|
* 功能解释
|
|
|
|
|
* 映射操作:使用 mapToPair 方法将每个 Row 对象转换为一个键值对 Tuple2<String, Row>。具体来说,
|
|
|
|
|
* 键是 Row 对象中的第3列的值,值是整个 Row 对象。
|
|
|
|
|
* 生成键值对:通过提取 Row 对象中的特定字段生成键值对,方便后续的聚合和处理操作。
|
|
|
|
|
* getSessonInfoPairRDD 方法:将 JavaRDD<Row> 转换为 JavaPairRDD<String, Row>,
|
|
|
|
|
* 其中键为 Row 对象中的 sessionId,
|
|
|
|
|
* 值为整个 Row 对象。
|
|
|
|
|
* 这有助于后续的聚合和处理操作,例如按 sessionId 分组统计或其他复杂操作。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -352,6 +407,12 @@ public class UserVisitAnalyze {
|
|
|
|
|
* @param sessionInfoPairRDD
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
//方法签名:
|
|
|
|
|
//定义一个静态方法 aggregateBySessionId,
|
|
|
|
|
// 该方法接受一个 SQLContext
|
|
|
|
|
// 和一个 JavaPairRDD<String, Row> 类型的参数 sessionInfoPairRDD,
|
|
|
|
|
// 并返回一个 JavaPairRDD<String, String>。
|
|
|
|
|
private static JavaPairRDD<String,String> aggregateBySessionId(SQLContext sc, JavaPairRDD<String, Row> sessionInfoPairRDD) {
|
|
|
|
|
/**
|
|
|
|
|
* 先将数据映射成map格式
|
|
|
|
@ -368,8 +429,15 @@ public class UserVisitAnalyze {
|
|
|
|
|
/**
|
|
|
|
|
* 根据sessionId进行分组
|
|
|
|
|
*/
|
|
|
|
|
//根据 sessionId 进行分组:
|
|
|
|
|
//使用 groupByKey 方法将 JavaPairRDD<String, Row> 按 sessionId 分组,
|
|
|
|
|
// 得到一个 JavaPairRDD<String, Iterable<Row>>,
|
|
|
|
|
// 其中键为 sessionId,值为具有相同 sessionId 的所有 Row 对象的迭代器。
|
|
|
|
|
JavaPairRDD<String,Iterable<Row>> sessionActionGrouped=sessionInfoPairRDD.groupByKey();
|
|
|
|
|
|
|
|
|
|
//映射生成键值对:
|
|
|
|
|
//使用 mapToPair 方法将每个 Tuple2<String, Iterable<Row>> 转换为一个 Tuple2<Long, String>。
|
|
|
|
|
// 具体来说,键为 Row 对象中的 userId,值为生成的字符串信息。
|
|
|
|
|
JavaPairRDD<Long,String> sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> stringIterableTuple2) throws Exception {
|
|
|
|
|