|
|
|
@ -616,8 +616,17 @@ public class UserVisitAnalyze {
|
|
|
|
|
* @param sessionAggrStatAccumulator
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
//方法签名
|
|
|
|
|
//方法 filterSessionAndAggrStat 接受三个参数:
|
|
|
|
|
//sessionInfoRDD:一个 JavaPairRDD,包含会话ID和会话信息。
|
|
|
|
|
//taskParam:一个 JSONObject,包含筛选条件。
|
|
|
|
|
//sessionAggrStatAccumulator:一个 Accumulator,用于累加统计信息。
|
|
|
|
|
private static JavaPairRDD<String,String> filterSessionAndAggrStat(JavaPairRDD<String, String> sessionInfoRDD, final JSONObject taskParam, final Accumulator<String> sessionAggrStatAccumulator){
|
|
|
|
|
//得到条件
|
|
|
|
|
//从任务参数中提取筛选条件
|
|
|
|
|
//从 taskParam 中提取不同的筛选条件,
|
|
|
|
|
//如年龄范围、职业、城市、性别、搜索关键词和点击类别ID。
|
|
|
|
|
String startAge=ParamUtils.getParam(taskParam,Constants.PARAM_STARTAGE);
|
|
|
|
|
String endAge=ParamUtils.getParam(taskParam,Constants.PARAM_ENDAGE);
|
|
|
|
|
String professionals=ParamUtils.getParam(taskParam,Constants.PARAM_PROFESSONALS);
|
|
|
|
@ -627,44 +636,60 @@ public class UserVisitAnalyze {
|
|
|
|
|
String categoryIds=ParamUtils.getParam(taskParam,Constants.PARAM_CLICK_CATEGORYIDS);
|
|
|
|
|
|
|
|
|
|
//拼接参数
|
|
|
|
|
//将提取的筛选条件拼接成一个字符串 _paramter,方便后续进行条件判断。
|
|
|
|
|
String _paramter=(startAge!=null?Constants.PARAM_STARTAGE+"="+startAge+"|":"")+
|
|
|
|
|
(endAge!=null?Constants.PARAM_ENDAGE+"="+endAge+"|":"")+(professionals!=null?Constants.PARAM_PROFESSONALS+"="+professionals+"|":"")+
|
|
|
|
|
(cities!=null?Constants.PARAM_CIYTIES+"="+cities+"|":"")+(sex!=null?Constants.PARAM_SEX+"="+sex+"|":"")+
|
|
|
|
|
(keyWords!=null?Constants.PARAM_SERACH_KEYWORDS+"="+keyWords+"|":"")+(categoryIds!=null?Constants.PARAM_CLICK_CATEGORYIDS+"="+categoryIds+"|":"");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//去除末尾多余的 |
|
|
|
|
|
//确保参数格式正确。
|
|
|
|
|
if(_paramter.endsWith("\\|"))
|
|
|
|
|
_paramter=_paramter.substring(0,_paramter.length()-1);
|
|
|
|
|
|
|
|
|
|
//定义最终参数
|
|
|
|
|
//使用 final 关键字定义最终参数 paramter,确保在 filter 函数中可以使用。
|
|
|
|
|
final String paramter=_paramter;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//过滤和统计会话数据
|
|
|
|
|
JavaPairRDD<String,String> filteredSessionRDD=sessionInfoRDD.filter(new Function<Tuple2<String, String>, Boolean>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Boolean call(Tuple2<String, String> tuple2) throws Exception {
|
|
|
|
|
String sessionInfo=tuple2._2;
|
|
|
|
|
//按照条件进行过滤
|
|
|
|
|
//
|
|
|
|
|
//按照年龄进行过滤
|
|
|
|
|
if(!ValidUtils.between(sessionInfo,Constants.FIELD_AGE,paramter,Constants.PARAM_STARTAGE,Constants.PARAM_ENDAGE))
|
|
|
|
|
return false;
|
|
|
|
|
//
|
|
|
|
|
//按照职业进行过滤
|
|
|
|
|
if(!ValidUtils.in(sessionInfo,Constants.FIELD_PROFESSIONAL,paramter,Constants.PARAM_PROFESSONALS))
|
|
|
|
|
return false;
|
|
|
|
|
//
|
|
|
|
|
//按照城市进行过滤
|
|
|
|
|
if(!ValidUtils.in(sessionInfo,Constants.FIELD_CITY,paramter,Constants.PARAM_CIYTIES))
|
|
|
|
|
return false;
|
|
|
|
|
//
|
|
|
|
|
//按照性别进行筛选
|
|
|
|
|
if(!ValidUtils.equal(sessionInfo,Constants.FIELD_SEX,paramter,Constants.PARAM_SEX))
|
|
|
|
|
return false;
|
|
|
|
|
//
|
|
|
|
|
//按照搜索词进行过滤,只要有一个搜索词即可
|
|
|
|
|
if(!ValidUtils.in(sessionInfo,Constants.FIELD_SERACH_KEYWORDS,paramter,Constants.PARAM_PROFESSONALS))
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
if(!ValidUtils.in(sessionInfo,Constants.FIELD_CLICK_CATEGORYIDS,paramter,Constants.FIELD_CLICK_CATEGORYIDS))
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
//如果经过了之前的所有的过滤条件,也就是满足用户筛选条件
|
|
|
|
|
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
|
|
|
|
|
|
|
|
|
|
//计算出访问时长和访问步长的范围并进行相应的累加
|
|
|
|
|
Long visitLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_VISIT_LENGTH));
|
|
|
|
|
Long stepLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_STEP_LENGTH));
|
|
|
|
|
|
|
|
|
|
//使用函数进行统计
|
|
|
|
|
calculateVisitLength(visitLength);
|
|
|
|
|
calculateStepLength(stepLength);
|
|
|
|
@ -672,6 +697,8 @@ public class UserVisitAnalyze {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//统计访问时长的数量
|
|
|
|
|
//从会话信息中提取访问时长 visitLength。
|
|
|
|
|
//根据 visitLength 的范围进行统计,累加到 sessionAggrStatAccumulator 中。
|
|
|
|
|
private void calculateVisitLength(Long visitLegth)
|
|
|
|
|
{
|
|
|
|
|
if(visitLegth>=1&&visitLegth<=3)
|
|
|
|
@ -693,7 +720,10 @@ public class UserVisitAnalyze {
|
|
|
|
|
else if(visitLegth>1800)
|
|
|
|
|
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//统计访问步长的数量
|
|
|
|
|
//从会话信息中提取访问步长 stepLength。
|
|
|
|
|
//根据 stepLength 的范围进行统计,累加到 sessionAggrStatAccumulator 中。
|
|
|
|
|
private void calculateStepLength(Long stepLength)
|
|
|
|
|
{
|
|
|
|
|
if(stepLength>=1&&stepLength<=3)
|
|
|
|
@ -710,8 +740,28 @@ public class UserVisitAnalyze {
|
|
|
|
|
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
//返回过滤后的会话数据 filteredSessionRDD。
|
|
|
|
|
return filteredSessionRDD;
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 功能总结
|
|
|
|
|
* 功能:该方法用于过滤和统计用户会话数据。根据传入的任务参数(如年龄、职业、城市、性别、搜索关键词和点击类别ID),对会话数据进行筛选,并统计访问时长和访问步长的范围。
|
|
|
|
|
* 关键步骤:
|
|
|
|
|
* 提取筛选条件:从 taskParam 中提取筛选条件。
|
|
|
|
|
* 拼接参数:将筛选条件拼接成一个字符串 _paramter。
|
|
|
|
|
* 过滤会话数据:使用 filter 方法对会话数据进行筛选。
|
|
|
|
|
* 统计访问时长:根据访问时长进行范围统计。
|
|
|
|
|
* 统计访问步长:根据访问步长进行范围统计。
|
|
|
|
|
* 返回结果:返回过滤后的会话数据 filteredSessionRDD。
|
|
|
|
|
* 通过这些步骤,可以筛选出符合特定条件的会话数据,并进行访问时长和访问步长的统计,为后续的数据分析提供基础。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
@ -720,16 +770,40 @@ public class UserVisitAnalyze {
|
|
|
|
|
* @param sessionInfoPairRDD
|
|
|
|
|
* @return
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
//定义一个静态方法 getFilterFullInfoRDD,该方法接受两个参数:
|
|
|
|
|
//filteredSessionRDD:一个 JavaPairRDD<String, String>,包含过滤后的会话信息。
|
|
|
|
|
//sessionInfoPairRDD:一个 JavaPairRDD<String, Row>,
|
|
|
|
|
// 包含完整的会话信息(会话ID和包含详细信息的 Row 对象)。
|
|
|
|
|
private static JavaPairRDD<String, Row> getFilterFullInfoRDD(JavaPairRDD<String, String> filteredSessionRDD, JavaPairRDD<String, Row> sessionInfoPairRDD) {
|
|
|
|
|
//1.获取符合条件的session范围的所有品类
|
|
|
|
|
//使用 filteredSessionRDD 和 sessionInfoPairRDD 进行 join 操作,
|
|
|
|
|
// 将两个 RDD 中的相同 key(会话ID)的数据连接在一起。
|
|
|
|
|
//使用 mapToPair 方法对连接后的结果进行转换,生成新的 JavaPairRDD。
|
|
|
|
|
return filteredSessionRDD.join(sessionInfoPairRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Row>>, String, Row>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<String, Row> call(Tuple2<String, Tuple2<String, Row>> stringTuple2Tuple2) throws Exception {
|
|
|
|
|
|
|
|
|
|
//从 Tuple2<String, Tuple2<String, Row>> 中提取会话ID stringTuple2Tuple2._1
|
|
|
|
|
// 和完整的 Row 对象 stringTuple2Tuple2._2._2。
|
|
|
|
|
//将会话ID和完整的 Row 对象封装为一个新的 Tuple2<String, Row> 并返回。
|
|
|
|
|
return new Tuple2<String, Row>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._2);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 功能总结
|
|
|
|
|
* 功能:该方法用于获取符合条件的会话范围的所有品类的详细信息。具体步骤如下:
|
|
|
|
|
* 连接两个 RDD:使用 join 方法将 filteredSessionRDD 和 sessionInfoPairRDD 中的相同会话ID的数据连接在一起。
|
|
|
|
|
* 提取详细信息:通过 mapToPair 方法提取每个连接后的 Tuple2 中的会话ID和完整的 Row 对象。
|
|
|
|
|
* 返回结果:返回一个新的 JavaPairRDD<String, Row>,其中每个 Row 对象包含了完整的会话信息。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 随机抽取Sesison功能
|
|
|
|
|
* @param taskId
|
|
|
|
|