|
|
|
@ -1,20 +1,25 @@
|
|
|
|
|
package cn.edu.hust.session;
|
|
|
|
|
// 包声明,表明该类所属的包名,用于在项目中对类进行组织和分类管理,方便代码模块化以及避免命名冲突,此处在 cn.edu.hust.session 包下定义类。
|
|
|
|
|
|
|
|
|
|
import cn.edu.hust.constant.Constants;
|
|
|
|
|
import cn.edu.hust.util.StringUtils;
|
|
|
|
|
import org.apache.spark.AccumulatorParam;
|
|
|
|
|
// 导入相关类。Constants类可能存放如时间区间、步数区间等相关常量;StringUtils类提供字符串操作相关工具方法;AccumulatorParam是Apache Spark中的接口,用于自定义累加器行为,本类将实现它来定制累加逻辑。
|
|
|
|
|
|
|
|
|
|
public class SessionAggrStatAccumulator implements AccumulatorParam<String>{
|
|
|
|
|
// 定义一个公共类SessionAggrStatAccumulator,它实现了AccumulatorParam接口,并指定泛型类型为String,意味着该累加器操作的数据类型是字符串,要按接口定义的方法实现针对字符串类型数据的累加规则。
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String addAccumulator(String s, String t1) {
|
|
|
|
|
return add(s,t1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 重写AccumulatorParam接口的addAccumulator方法,接收两个字符串参数s和t1,直接调用类中定义的add方法,并返回其结果,作用是按照自定义规则(在add方法中定义)累加两个字符串所代表的数据,在相应累加场景下发挥作用。
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String addInPlace(String s, String r1) {
|
|
|
|
|
return add(s,r1);
|
|
|
|
|
}
|
|
|
|
|
// 同样是重写AccumulatorParam接口的方法,接收两个字符串参数s和r1,也是调用add方法并返回其结果,和addAccumulator方法类似,用于按特定逻辑对传入的两个字符串数据进行累加处理,不过是在Spark内部不同的累加操作阶段被调用。
|
|
|
|
|
|
|
|
|
|
//主要用于数据的初始化,这里主要返回一个值就是所有范围区间得的数量
|
|
|
|
|
@Override
|
|
|
|
@ -36,6 +41,7 @@ public class SessionAggrStatAccumulator implements AccumulatorParam<String>{
|
|
|
|
|
+ Constants.STEP_PERIOD_30_60 + "=0|"
|
|
|
|
|
+ Constants.STEP_PERIOD_60 + "=0";
|
|
|
|
|
}
|
|
|
|
|
// 重写AccumulatorParam接口的zero方法,用于对累加器进行初始化操作。返回由多个常量(代表不同时间区间、步数区间等统计维度)和对应初始值0拼接而成的字符串,各部分以“|”分隔,意味着初始化时各统计维度的数量都设为0。
|
|
|
|
|
|
|
|
|
|
private String add(String v1,String v2)
|
|
|
|
|
{
|
|
|
|
@ -44,8 +50,11 @@ public class SessionAggrStatAccumulator implements AccumulatorParam<String>{
|
|
|
|
|
if(value!=null)
|
|
|
|
|
{
|
|
|
|
|
int newValue=Integer.valueOf(value)+1;
|
|
|
|
|
return StringUtils.setFieldInConcatString(v1,"\\|",v2,String.valueOf(newValue));
|
|
|
|
|
return StringUtils.setFieldInConcatString(v1,"\\|",v2,String.valueOf(newValue));
|
|
|
|
|
}
|
|
|
|
|
return v1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// 私有方法,供addAccumulator和addInPlace等方法调用。先判断传入的第一个字符串v1是否为空,为空则返回v2。然后通过StringUtils工具类的方法尝试从v1(类似zero方法初始化的格式字符串)中获取和v2对应的字段值(按“|”分割查找),若获取到,将其转成整数加1后再用工具类方法更新回v1对应位置并返回更新后的v1;若没获取到则直接返回原来的v1。
|
|
|
|
|
|
|
|
|
|
// 整体功能解释:
|
|
|
|
|
// 这个类实现了基于Apache Spark的自定义累加器功能,用于统计与会话(Session)相关的聚合统计信息,比如统计不同时间区间(像1s - 3s、4s - 6s等时间范围)以及不同步数区间(如1 - 3步、4 - 6步等范围)内会话的数量情况。zero方法初始化各统计维度计数值为0,addAccumulator和addInPlace方法按照add方法定义的逻辑对相应统计维度的计数值进行累加(每出现符合对应区间的会话相关数据,对应维度计数加1),方便后续在Spark任务执行中对会话多维度聚合统计信息进行收集和汇总。
|