|
|
|
@ -441,6 +441,10 @@ public class UserVisitAnalyze {
|
|
|
|
|
JavaPairRDD<Long,String> sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> stringIterableTuple2) throws Exception {
|
|
|
|
|
|
|
|
|
|
//提取关键信息:
|
|
|
|
|
//从 Iterable<Row> 中提取 sessionId 和 rows。
|
|
|
|
|
//初始化一些变量,用于存储搜索关键字、点击类别ID、用户ID、开始时间、结束时间和步骤长度。
|
|
|
|
|
String sessionId=stringIterableTuple2._1;
|
|
|
|
|
Iterable<Row> rows=stringIterableTuple2._2;
|
|
|
|
|
StringBuffer searchKeywords=new StringBuffer();
|
|
|
|
@ -449,19 +453,27 @@ public class UserVisitAnalyze {
|
|
|
|
|
Date startTime=null;
|
|
|
|
|
Date endTime=null;
|
|
|
|
|
int stepLength=0;
|
|
|
|
|
|
|
|
|
|
//遍历每一行 Row 对象:
|
|
|
|
|
//遍历每个 Row 对象,如果 userId 为空,则将其设置为当前的 userId。
|
|
|
|
|
//提取 searchKeyword 和 clickCategoryId。
|
|
|
|
|
for (Row row:rows)
|
|
|
|
|
{
|
|
|
|
|
if(userId==null)
|
|
|
|
|
userId=row.getLong(1);
|
|
|
|
|
String searchKeyword=row.getString(5);
|
|
|
|
|
Long clickCategoryId=row.getLong(6);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//判断是否需要拼接
|
|
|
|
|
//拼接搜索关键字和点击类别ID:
|
|
|
|
|
//检查 searchKeyword 是否为空并拼接到 searchKeywords 中。
|
|
|
|
|
if(StringUtils.isNotEmpty(searchKeyword))
|
|
|
|
|
{
|
|
|
|
|
if(!searchKeywords.toString().contains(searchKeyword))
|
|
|
|
|
searchKeywords.append(searchKeyword+",");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//检查 clickCategoryId 是否为 null 并拼接到 clickCategoryIds 中。
|
|
|
|
|
if(clickCategoryId!=null)
|
|
|
|
|
{
|
|
|
|
|
if(!clickCategoryId.toString().contains(String.valueOf(clickCategoryId)))
|
|
|
|
@ -469,6 +481,9 @@ public class UserVisitAnalyze {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//计算session开始时间和结束时间
|
|
|
|
|
//从 Row 对象中提取 actionTime。
|
|
|
|
|
//更新 startTime 和 endTime,确保它们分别为最小和最大时间。
|
|
|
|
|
//增加 stepLength 记录步骤数。
|
|
|
|
|
Date actionTime= DateUtils.parseTime(row.getString(4));
|
|
|
|
|
if(startTime==null)
|
|
|
|
|
startTime=actionTime;
|
|
|
|
@ -484,9 +499,13 @@ public class UserVisitAnalyze {
|
|
|
|
|
}
|
|
|
|
|
stepLength++;
|
|
|
|
|
}
|
|
|
|
|
//访问时长(s)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//计算访问时长:
|
|
|
|
|
//计算访问时长(以秒为单位)。
|
|
|
|
|
Long visitLengtth=(endTime.getTime()-startTime.getTime())/1000;
|
|
|
|
|
|
|
|
|
|
//格式化信息并返回:
|
|
|
|
|
String searchKeywordsInfo=StringUtils.trimComma(searchKeywords.toString());
|
|
|
|
|
String clickCategoryIdsInfo=StringUtils.trimComma(clickCategoryIds.toString());
|
|
|
|
|
String info=Constants.FIELD_SESSIONID+"="+sessionId+"|"+Constants.FIELD_SERACH_KEYWORDS+"="+searchKeywordsInfo+"|"
|
|
|
|
@ -498,40 +517,96 @@ public class UserVisitAnalyze {
|
|
|
|
|
|
|
|
|
|
//查询所有的用户数据
|
|
|
|
|
String sql="select * from user_info";
|
|
|
|
|
|
|
|
|
|
//执行SQL查询获取用户信息,并将结果转换为 JavaRDD<Row>。
|
|
|
|
|
JavaRDD<Row> userInfoRDD=sc.sql(sql).javaRDD();
|
|
|
|
|
|
|
|
|
|
//将用户信息映射成map
|
|
|
|
|
//使用 mapToPair 方法将 JavaRDD<Row> 转换为 JavaPairRDD<Long, Row>,
|
|
|
|
|
// 其中键为 Row 对象中的某个列(通常是 userId),值为整个 Row 对象。
|
|
|
|
|
JavaPairRDD<Long,Row> userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction<Row, Long, Row>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<Long, Row> call(Row row) throws Exception {
|
|
|
|
|
return new Tuple2<Long, Row>(row.getLong(0),row);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//将两个信息join在一起
|
|
|
|
|
//使用 join 方法将两个 JavaPairRDD 进行连接,连接键为 userId。结果是一个 JavaPairRDD<Long, Tuple2<String, Row>>,
|
|
|
|
|
// 其中键为 userId,值为一个包含用户会话信息和用户信息的元组。
|
|
|
|
|
JavaPairRDD<Long,Tuple2<String,Row>> tuple2JavaPairRDD=sessionPartInfo.join(userInfoPariRDD);
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* aggregateBySessionId 方法:将用户会话数据按 sessionId 分组,并生成每个会话的详细信息。具体包括搜索关键字、点击类别ID、访问时长、步骤数和开始时间。
|
|
|
|
|
* 查询用户数据:从数据库中查询用户信息。
|
|
|
|
|
* 映射用户信息:将用户信息转换为 JavaPairRDD<Long, Row>。
|
|
|
|
|
* 连接会话信息和用户信息:将用户会话信息和用户信息进行连接,生成包含用户详细信息的会话数据。
|
|
|
|
|
* 通过这些步骤,可以获取每个用户在不同会话中的详细行为信息,为后续的分析和统计提供基础数据。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 拿到所需的session
|
|
|
|
|
*/
|
|
|
|
|
//定义 JavaPairRDD<String, String> 变量 sessionInfo:
|
|
|
|
|
//定义一个名为 sessionInfo 的 JavaPairRDD<String, String>,
|
|
|
|
|
// 并使用 mapToPair 方法对 tuple2JavaPairRDD 进行映射操作。
|
|
|
|
|
JavaPairRDD<String,String> sessionInfo=tuple2JavaPairRDD.mapToPair(new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Tuple2<String, String> call(Tuple2<Long, Tuple2<String, Row>> longTuple2Tuple2) throws Exception {
|
|
|
|
|
|
|
|
|
|
//提取会话信息和用户信息:
|
|
|
|
|
//从 Tuple2<Long, Tuple2<String, Row>> 中提取 sessionPartInfo 和 userInfo。
|
|
|
|
|
//sessionPartInfo 是会话的详细信息,userInfo 是用户的详细信息。
|
|
|
|
|
String sessionPartInfo=longTuple2Tuple2._2._1;
|
|
|
|
|
Row userInfo=longTuple2Tuple2._2._2;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//拿到需要的用户信息
|
|
|
|
|
//从 Row 对象 userInfo 中提取用户的基本信息,
|
|
|
|
|
// 包括年龄 age、职业 professional、城市 city 和性别 sex。
|
|
|
|
|
int age=userInfo.getInt(3);
|
|
|
|
|
String professional=userInfo.getString(4);
|
|
|
|
|
String city=userInfo.getString(5);
|
|
|
|
|
String sex=userInfo.getString(6);
|
|
|
|
|
|
|
|
|
|
//拼接字符串
|
|
|
|
|
//将会话信息 sessionPartInfo 和用户信息拼接成一个完整的字符串 fullInfo。
|
|
|
|
|
String fullInfo=sessionPartInfo+"|"+Constants.FIELD_AGE+"="+age+"|"
|
|
|
|
|
+Constants.FIELD_PROFESSIONAL+"="+professional+"|"+Constants.FIELD_CITY+"="+city+"|"+Constants.FIELD_SEX+"="+sex;
|
|
|
|
|
|
|
|
|
|
//提取会话ID:
|
|
|
|
|
//使用 StringUtils.getFieldFromConcatString 方法从 sessionPartInfo 中提取会话ID,
|
|
|
|
|
// 假设会话ID的格式是 Constants.FIELD_SESSIONID。
|
|
|
|
|
String session=StringUtils.getFieldFromConcatString(sessionPartInfo,"\\|",Constants.FIELD_SESSIONID);
|
|
|
|
|
|
|
|
|
|
//返回结果:
|
|
|
|
|
//返回一个 Tuple2<String, String>,
|
|
|
|
|
// 其中键为会话ID session,值为包含会话信息和用户信息的字符串 fullInfo。
|
|
|
|
|
return new Tuple2<String, String>(session,fullInfo);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
//返回处理后的 sessionInfo。
|
|
|
|
|
return sessionInfo;
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* 功能总结
|
|
|
|
|
* 功能:该方法将 tuple2JavaPairRDD 中包含的会话信息和用户信息进行合并,生成一个包含会话详细信息和用户详细信息的字符串。然后将这些信息封装为 JavaPairRDD<String, String>。
|
|
|
|
|
* 关键步骤:
|
|
|
|
|
* 提取信息:从 tuple2JavaPairRDD 中提取会话信息和用户信息。
|
|
|
|
|
* 提取用户信息:从 Row 对象中提取用户的基本信息,包括年龄、职业、城市和性别。
|
|
|
|
|
* 拼接信息:将会话信息和用户信息拼接成一个完整的字符串。
|
|
|
|
|
* 提取会话ID:从会话信息中提取会话ID。
|
|
|
|
|
* 返回结果:返回一个包含会话ID和完整信息的 JavaPairRDD。
|
|
|
|
|
* 通过这些步骤,可以将用户会话数据和用户信息合并在一起,为后续的分析和统计提供更详细的数据。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|