|
|
|
@ -17,39 +17,63 @@ import org.apache.spark.sql.SQLContext;
|
|
|
|
|
import org.apache.spark.sql.types.DataTypes;
|
|
|
|
|
import org.apache.spark.sql.types.StructType;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 模拟数据程序
|
|
|
|
|
* 这个类主要用于模拟生成一些数据,并将其转换为DataFrame格式,
|
|
|
|
|
* 同时注册为临时表,方便后续在Spark SQL环境中进行操作和分析。
|
|
|
|
|
* @author Administrator
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
public class MockData {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 弄你数据
|
|
|
|
|
* @param sc
|
|
|
|
|
* @param sqlContext
|
|
|
|
|
* mock方法用于生成模拟数据,并基于生成的数据创建DataFrame,然后注册为临时表。
|
|
|
|
|
* 它接收JavaSparkContext和SQLContext作为参数,这两个参数是在Spark中进行数据处理和SQL操作的关键上下文对象。
|
|
|
|
|
* @param sc JavaSparkContext对象,用于在Spark中创建和操作分布式数据集(RDD等)。
|
|
|
|
|
* @param sqlContext SQLContext对象,用于在Spark中执行SQL相关操作,如创建DataFrame、注册临时表等。
|
|
|
|
|
*/
|
|
|
|
|
public static void mock(JavaSparkContext sc,
|
|
|
|
|
SQLContext sqlContext) {
|
|
|
|
|
SQLContext sqlContext) {
|
|
|
|
|
|
|
|
|
|
// 创建一个用于存储Row对象的列表,后续将用于构建DataFrame,每个Row对象代表一行数据
|
|
|
|
|
List<Row> rows = new ArrayList<Row>();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 定义一个字符串数组,包含了一些模拟的搜索关键词,用于模拟用户的搜索行为
|
|
|
|
|
String[] searchKeywords = new String[] {"火锅", "蛋糕", "重庆辣子鸡", "重庆小面",
|
|
|
|
|
"呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"};
|
|
|
|
|
|
|
|
|
|
// 通过DateUtils工具类获取今天的日期,作为模拟数据中日期相关字段的基础值
|
|
|
|
|
String date = DateUtils.getTodayDate();
|
|
|
|
|
|
|
|
|
|
// 定义一个字符串数组,包含了用户可能进行的操作类型,如搜索、点击、下单、支付
|
|
|
|
|
String[] actions = new String[]{"search", "click", "order", "pay"};
|
|
|
|
|
|
|
|
|
|
// 创建一个Random对象,用于生成各种随机数,来模拟不同的情况
|
|
|
|
|
Random random = new Random();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 外层循环模拟100个不同的用户
|
|
|
|
|
for(int i = 0; i < 100; i++) {
|
|
|
|
|
long userid = random.nextInt(100);
|
|
|
|
|
|
|
|
|
|
// 为每个用户随机生成一个用户ID,范围在0到99之间
|
|
|
|
|
long userid = random.nextInt(100);
|
|
|
|
|
|
|
|
|
|
// 中层循环模拟每个用户的10次会话
|
|
|
|
|
for(int j = 0; j < 10; j++) {
|
|
|
|
|
String sessionid = UUID.randomUUID().toString().replace("-", "");
|
|
|
|
|
// 生成一个唯一的会话ID,通过UUID生成后去除其中的'-'字符
|
|
|
|
|
String sessionid = UUID.randomUUID().toString().replace("-", "");
|
|
|
|
|
|
|
|
|
|
// 生成一个基础的操作时间,格式为今天的日期加上一个随机的小时数(0到22)
|
|
|
|
|
String baseActionTime = date + " " + random.nextInt(23);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 内层循环模拟每次会话中的多次操作,操作次数是随机的(0到99次之间)
|
|
|
|
|
for(int k = 0; k < random.nextInt(100); k++) {
|
|
|
|
|
long pageid = random.nextInt(10);
|
|
|
|
|
// 为每次操作随机生成一个页面ID,范围在0到9之间
|
|
|
|
|
long pageid = random.nextInt(10);
|
|
|
|
|
|
|
|
|
|
// 生成完整的操作时间,在基础操作时间上补充随机的分钟和秒数,
|
|
|
|
|
// 通过StringUtils的fulfuill方法确保分钟和秒数是两位数格式(不足两位前面补0)
|
|
|
|
|
String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)));
|
|
|
|
|
|
|
|
|
|
// 初始化一些操作相关的字段为null,后续根据具体的操作类型来赋值
|
|
|
|
|
String searchKeyword = null;
|
|
|
|
|
Long clickCategoryId = null;
|
|
|
|
|
Long clickProductId = null;
|
|
|
|
@ -57,33 +81,45 @@ public class MockData {
|
|
|
|
|
String orderProductIds = null;
|
|
|
|
|
String payCategoryIds = null;
|
|
|
|
|
String payProductIds = null;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 随机选择一个操作类型(从定义的actions数组中随机选取)
|
|
|
|
|
String action = actions[random.nextInt(4)];
|
|
|
|
|
|
|
|
|
|
// 根据选择的操作类型,设置相应的字段值
|
|
|
|
|
if("search".equals(action)) {
|
|
|
|
|
searchKeyword = searchKeywords[random.nextInt(10)];
|
|
|
|
|
// 如果是搜索操作,从搜索关键词数组中随机选取一个作为搜索关键词
|
|
|
|
|
searchKeyword = searchKeywords[random.nextInt(10)];
|
|
|
|
|
} else if("click".equals(action)) {
|
|
|
|
|
clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100)));
|
|
|
|
|
clickProductId = Long.valueOf(String.valueOf(random.nextInt(100)));
|
|
|
|
|
// 如果是点击操作,随机生成点击的分类ID和产品ID(范围在0到99之间)
|
|
|
|
|
clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100)));
|
|
|
|
|
clickProductId = Long.valueOf(String.valueOf(random.nextInt(100)));
|
|
|
|
|
} else if("order".equals(action)) {
|
|
|
|
|
orderCategoryIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
// 如果是下单操作,随机生成下单的分类ID和产品ID(转换为字符串形式)
|
|
|
|
|
orderCategoryIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
orderProductIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
} else if("pay".equals(action)) {
|
|
|
|
|
payCategoryIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
// 如果是支付操作,随机生成支付的分类ID和产品ID(转换为字符串形式)
|
|
|
|
|
payCategoryIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
payProductIds = String.valueOf(random.nextInt(100));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Row row = RowFactory.create(date, userid, sessionid,
|
|
|
|
|
|
|
|
|
|
// 使用RowFactory创建一个Row对象,将本次操作相关的所有字段值传入,代表一行模拟数据
|
|
|
|
|
Row row = RowFactory.create(date, userid, sessionid,
|
|
|
|
|
pageid, actionTime, searchKeyword,
|
|
|
|
|
clickCategoryId, clickProductId,
|
|
|
|
|
orderCategoryIds, orderProductIds,
|
|
|
|
|
payCategoryIds, payProductIds);
|
|
|
|
|
|
|
|
|
|
// 将生成的Row对象添加到rows列表中,不断积累模拟数据
|
|
|
|
|
rows.add(row);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 将存储了Row对象的列表转换为JavaRDD<Row>,使其可以在Spark的分布式环境下进行处理
|
|
|
|
|
JavaRDD<Row> rowsRDD = sc.parallelize(rows);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 定义DataFrame的数据结构(Schema),明确每个字段的名称、数据类型以及是否可为空等信息
|
|
|
|
|
StructType schema = DataTypes.createStructType(Arrays.asList(
|
|
|
|
|
DataTypes.createStructField("date", DataTypes.StringType, true),
|
|
|
|
|
DataTypes.createStructField("user_id", DataTypes.LongType, true),
|
|
|
|
@ -97,36 +133,64 @@ public class MockData {
|
|
|
|
|
DataTypes.createStructField("order_product_ids", DataTypes.StringType, true),
|
|
|
|
|
DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true),
|
|
|
|
|
DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true)));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 使用SQLContext基于rowsRDD和定义好的结构schema创建一个DataFrame对象,用于后续的数据操作和分析
|
|
|
|
|
DataFrame df = sqlContext.createDataFrame(rowsRDD, schema);
|
|
|
|
|
|
|
|
|
|
df.registerTempTable("user_visit_action");
|
|
|
|
|
|
|
|
|
|
// 将创建好的DataFrame注册为一个临时表,表名为"user_visit_action",方便后续用SQL语句进行查询等操作
|
|
|
|
|
df.registerTempTable("user_visit_action");
|
|
|
|
|
|
|
|
|
|
// 打印DataFrame的第一行数据,用于简单查看模拟生成的数据情况
|
|
|
|
|
for(Row _row : df.take(1)) {
|
|
|
|
|
System.out.println(_row);
|
|
|
|
|
System.out.println(_row);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* ==================================================================
|
|
|
|
|
* 以下是模拟生成用户基本信息数据的相关代码部分,与上面模拟用户访问行为数据的逻辑类似,但字段不同。
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 清空之前用于存储用户访问行为数据的rows列表,准备存储用户基本信息数据
|
|
|
|
|
rows.clear();
|
|
|
|
|
|
|
|
|
|
// 定义一个字符串数组,包含了两种性别,用于模拟用户的性别信息
|
|
|
|
|
String[] sexes = new String[]{"male", "female"};
|
|
|
|
|
|
|
|
|
|
// 循环模拟生成100个用户的基本信息
|
|
|
|
|
for(int i = 0; i < 100; i ++) {
|
|
|
|
|
// 用户ID直接使用循环变量i,简单递增赋值
|
|
|
|
|
long userid = i;
|
|
|
|
|
|
|
|
|
|
// 生成用户名,格式为"user"加上用户ID
|
|
|
|
|
String username = "user" + i;
|
|
|
|
|
|
|
|
|
|
// 生成姓名,格式为"name"加上用户ID
|
|
|
|
|
String name = "name" + i;
|
|
|
|
|
|
|
|
|
|
// 随机生成用户年龄,范围在0到59岁之间
|
|
|
|
|
int age = random.nextInt(60);
|
|
|
|
|
|
|
|
|
|
// 生成职业信息,格式为"professional"加上一个随机数(范围在0到99之间)
|
|
|
|
|
String professional = "professional" + random.nextInt(100);
|
|
|
|
|
|
|
|
|
|
// 生成所在城市信息,格式为"city"加上一个随机数(范围在0到99之间)
|
|
|
|
|
String city = "city" + random.nextInt(100);
|
|
|
|
|
|
|
|
|
|
// 随机选择一个性别,从定义的sexes数组中随机选取
|
|
|
|
|
String sex = sexes[random.nextInt(2)];
|
|
|
|
|
|
|
|
|
|
Row row = RowFactory.create(userid, username, name, age,
|
|
|
|
|
|
|
|
|
|
// 使用RowFactory创建一个Row对象,将本次用户基本信息相关的所有字段值传入,代表一行模拟数据
|
|
|
|
|
Row row = RowFactory.create(userid, username, name, age,
|
|
|
|
|
professional, city, sex);
|
|
|
|
|
|
|
|
|
|
// 将生成的Row对象添加到rows列表中,积累用户基本信息数据
|
|
|
|
|
rows.add(row);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 将存储了用户基本信息Row对象的列表再次转换为JavaRDD<Row>,以便后续操作
|
|
|
|
|
rowsRDD = sc.parallelize(rows);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 定义用户基本信息DataFrame的数据结构(Schema),明确每个字段的名称、数据类型以及是否可为空等信息
|
|
|
|
|
StructType schema2 = DataTypes.createStructType(Arrays.asList(
|
|
|
|
|
DataTypes.createStructField("user_id", DataTypes.LongType, true),
|
|
|
|
|
DataTypes.createStructField("username", DataTypes.StringType, true),
|
|
|
|
@ -135,13 +199,17 @@ public class MockData {
|
|
|
|
|
DataTypes.createStructField("professional", DataTypes.StringType, true),
|
|
|
|
|
DataTypes.createStructField("city", DataTypes.StringType, true),
|
|
|
|
|
DataTypes.createStructField("sex", DataTypes.StringType, true)));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 使用SQLContext基于新的rowsRDD和定义好的结构schema2创建一个用于存储用户基本信息的DataFrame对象
|
|
|
|
|
DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
|
|
|
|
|
|
|
|
|
|
// 打印用户基本信息DataFrame的第一行数据,用于简单查看模拟生成的数据情况
|
|
|
|
|
for(Row _row : df2.take(1)) {
|
|
|
|
|
System.out.println(_row);
|
|
|
|
|
System.out.println(_row);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
df2.registerTempTable("user_info");
|
|
|
|
|
|
|
|
|
|
// 将用户基本信息的DataFrame注册为一个临时表,表名为"user_info",方便后续用SQL语句进行关联等操作
|
|
|
|
|
df2.registerTempTable("user_info");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|