From 69a0490f06af1e4a58e6079f8360f81a53c2537f Mon Sep 17 00:00:00 2001 From: oeljeklaus-you Date: Sat, 23 Jun 2018 13:25:15 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9A=8F=E6=9C=BA=E6=8A=BD=E5=8F=96=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 随机抽取算法实现 --- .idea/workspace.xml | 517 ++++++++++-------- .../java/cn/edu/hust/constant/Constants.java | 1 + .../edu/hust/dao/SessionRandomExtractDao.java | 7 + .../dao/impl/SessionRandomExtractDaoImpl.java | 12 + .../cn/edu/hust/domain/SessionDetail.java | 132 +++++ .../edu/hust/domain/SessionRandomExtract.java | 62 +++ .../cn/edu/hust/session/UserVisitAnalyze.java | 109 +++- 7 files changed, 594 insertions(+), 246 deletions(-) create mode 100644 src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java create mode 100644 src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java create mode 100644 src/main/java/cn/edu/hust/domain/SessionDetail.java create mode 100644 src/main/java/cn/edu/hust/domain/SessionRandomExtract.java diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 6e9017c..e37744c 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,10 +2,13 @@ + + + + - + - @@ -21,83 +24,37 @@ - - + + - - - - - - - - + + + + + + + + + + + + - - + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -139,7 +96,6 @@ @@ -253,6 +216,24 @@ + + + + + + + + + + + + + + + + + + @@ -292,7 +273,7 @@ - + @@ -551,7 +532,7 @@ - + 1529592741848 @@ -603,7 +584,7 @@ - @@ -619,7 +600,7 @@ - + @@ -707,142 +688,11 @@ - - - file://$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java - 93 - - - - file://$PROJECT_DIR$/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java - 309 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -887,6 +737,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1006,6 +890,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1034,6 +952,7 @@ + @@ -1093,14 +1012,6 @@ - - - - - - - - @@ -1119,27 +1030,6 @@ - - - - - - - - - - - - - - - - - - - - - @@ -1252,6 +1142,25 @@ + + + + + + + + + + + + + + + + + + + @@ -1262,6 +1171,24 @@ + + + + + + + + + + + + + + + + + + @@ -1274,20 +1201,128 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - - - - - - - + + + + + + + + + + + + diff --git a/src/main/java/cn/edu/hust/constant/Constants.java b/src/main/java/cn/edu/hust/constant/Constants.java index 361b9b5..037c678 100644 --- a/src/main/java/cn/edu/hust/constant/Constants.java +++ b/src/main/java/cn/edu/hust/constant/Constants.java @@ -24,6 +24,7 @@ public class Constants { public static final String FIELD_PROFESSIONAL="professional"; public static final String FIELD_VISIT_LENGTH="visitLength"; public static final String FIELD_STEP_LENGTH="stepLength"; + public static final String FIELD_START_TIME="startTime"; /** * Spark任务相关厂常量 diff --git a/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java b/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java new file mode 100644 index 0000000..265dca1 --- /dev/null +++ b/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java @@ -0,0 +1,7 @@ +package cn.edu.hust.dao; + +import cn.edu.hust.domain.SessionRandomExtract; + +public interface SessionRandomExtractDao { + void batchInsert(SessionRandomExtract sessionRandomExtract); +} diff --git a/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java b/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java new file mode 100644 index 0000000..1762533 --- /dev/null +++ b/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java @@ -0,0 +1,12 @@ +package cn.edu.hust.dao.impl; + +import cn.edu.hust.dao.SessionRandomExtractDao; +import cn.edu.hust.domain.SessionRandomExtract; +import cn.edu.hust.jdbc.JDBCHelper; + +public class SessionRandomExtractDaoImpl implements SessionRandomExtractDao{ + @Override + public void batchInsert(SessionRandomExtract sessionRandomExtract) { + //JDBCHelper.getInstance(). + } +} diff --git a/src/main/java/cn/edu/hust/domain/SessionDetail.java b/src/main/java/cn/edu/hust/domain/SessionDetail.java new file mode 100644 index 0000000..7ed37ae --- /dev/null +++ b/src/main/java/cn/edu/hust/domain/SessionDetail.java @@ -0,0 +1,132 @@ +package cn.edu.hust.domain; + +import java.io.Serializable; + +public class SessionDetail implements Serializable{ + private Long taskId; + private Long userId; + private String sessinId; + private Long pageid; + private String actionTime; + private String searchKeyWord; + private Long clickCategoryId; + private Long clickProductId; + private String orderCategoryIds; + private String orderProductIds; + private String payCategoryIds; + private String payProductIds; + + public SessionDetail() { + } + + public SessionDetail(Long taskId, Long userId, String sessinId, Long pageid, String actionTime, String searchKeyWord, Long clickCategoryId, Long clickProductId, String orderCategoryIds, String orderProductIds, String payCategoryIds, String payProductIds) { + this.taskId = taskId; + this.userId = userId; + this.sessinId = sessinId; + this.pageid = pageid; + this.actionTime = actionTime; + this.searchKeyWord = searchKeyWord; + this.clickCategoryId = clickCategoryId; + this.clickProductId = clickProductId; + this.orderCategoryIds = orderCategoryIds; + this.orderProductIds = orderProductIds; + this.payCategoryIds = payCategoryIds; + this.payProductIds = payProductIds; + } + + public Long getTaskId() { + return taskId; + } + + public void setTaskId(Long taskId) { + this.taskId = taskId; + } + + public Long getUserId() { + return userId; + } + + public void setUserId(Long userId) { + this.userId = userId; + } + + public String getSessinId() { + return sessinId; + } + + public void setSessinId(String sessinId) { + this.sessinId = sessinId; + } + + public Long getPageid() { + return pageid; + } + + public void setPageid(Long pageid) { + this.pageid = pageid; + } + + public String getActionTime() { + return actionTime; + } + + public void setActionTime(String actionTime) { + this.actionTime = actionTime; + } + + public String getSearchKeyWord() { + return searchKeyWord; + } + + public void setSearchKeyWord(String searchKeyWord) { + this.searchKeyWord = searchKeyWord; + } + + public Long getClickCategoryId() { + return clickCategoryId; + } + + public void setClickCategoryId(Long clickCategoryId) { + this.clickCategoryId = clickCategoryId; + } + + public Long getClickProductId() { + return clickProductId; + } + + public void setClickProductId(Long clickProductId) { + this.clickProductId = clickProductId; + } + + public String getOrderCategoryIds() { + return orderCategoryIds; + } + + public void setOrderCategoryIds(String orderCategoryIds) { + this.orderCategoryIds = orderCategoryIds; + } + + public String getOrderProductIds() { + return orderProductIds; + } + + public void setOrderProductIds(String orderProductIds) { + this.orderProductIds = orderProductIds; + } + + public String getPayCategoryIds() { + return payCategoryIds; + } + + public void setPayCategoryIds(String payCategoryIds) { + this.payCategoryIds = payCategoryIds; + } + + public String getPayProductIds() { + return payProductIds; + } + + public void setPayProductIds(String payProductIds) { + this.payProductIds = payProductIds; + } +} diff --git a/src/main/java/cn/edu/hust/domain/SessionRandomExtract.java b/src/main/java/cn/edu/hust/domain/SessionRandomExtract.java new file mode 100644 index 0000000..17b51a4 --- /dev/null +++ b/src/main/java/cn/edu/hust/domain/SessionRandomExtract.java @@ -0,0 +1,62 @@ +package cn.edu.hust.domain; + +import java.io.Serializable; + +public class SessionRandomExtract implements Serializable { + private Long taskId; + private String sessionId; + private String startTime; + private String searchKeyWords; + private String click_category_ids; + + public SessionRandomExtract() { + } + + public void set(Long taskId, String sessionId, String startTime, String searchKeyWords, String click_category_ids) { + this.taskId = taskId; + this.sessionId = sessionId; + this.startTime = startTime; + this.searchKeyWords = searchKeyWords; + this.click_category_ids = click_category_ids; + } + + public Long getTaskId() { + return taskId; + } + + public void setTaskId(Long taskId) { + this.taskId = taskId; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getSearchKeyWords() { + return searchKeyWords; + } + + public void setSearchKeyWords(String searchKeyWords) { + this.searchKeyWords = searchKeyWords; + } + + public String getClick_category_ids() { + return click_category_ids; + } + + public void setClick_category_ids(String click_category_ids) { + this.click_category_ids = click_category_ids; + } +} diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java index 234b135..c8f7f11 100644 --- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java +++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java @@ -21,11 +21,12 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.catalyst.expressions.Rand; import org.apache.spark.sql.hive.HiveContext; import org.joda.time.DateTime; import scala.Tuple2; -import java.util.Date; +import java.util.*; /** * 用户可以查询的范围包含 @@ -89,7 +90,12 @@ public class UserVisitAnalyze { * 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是 * 一场灾难。 */ + //在使用Accumulutor之前,需要使用Action算子,否则获取的值为空,这里随机计算 filteredSessionRDD.count(); + /** + * 使用CountByKey算子实现随机抽取功能 + */ + randomExtractSession(filteredSessionRDD); //计算各个session占比,并写入MySQL calculateAndPersist(sessionAggrStatAccumulator.value(),taskId); //关闭spark上下文 @@ -98,7 +104,6 @@ public class UserVisitAnalyze { - /** * 用于判断是否是生产环境 * @param sc @@ -207,7 +212,7 @@ public class UserVisitAnalyze { String clickCategoryIdsInfo=StringUtils.trimComma(clickCategoryIds.toString()); String info=Constants.FIELD_SESSIONID+"="+sessionId+"|"+Constants.FIELD_SERACH_KEYWORDS+"="+searchKeywordsInfo+"|" +Constants.FIELD_CLICK_CATEGORYIDS+"="+clickCategoryIdsInfo+"|"+Constants.FIELD_VISIT_LENGTH+"="+visitLengtth+"|" - +Constants.FIELD_STEP_LENGTH+"="+stepLength; + +Constants.FIELD_STEP_LENGTH+"="+stepLength+"|"+Constants.FIELD_START_TIME+"="+startTime; return new Tuple2(userId,info); } }); @@ -354,7 +359,101 @@ public class UserVisitAnalyze { return filteredSessionRDD; } + /** + * 随机抽取Sesison功能 + * @param filteredSessionRDD + */ + private static void randomExtractSession(JavaPairRDD filteredSessionRDD) { + //1.先将过滤Seesion进行映射,映射成为Time,Info的数据格式 + final JavaPairRDD mapDataRDD=filteredSessionRDD.mapToPair(new PairFunction, String, String>() { + @Override + public Tuple2 call(Tuple2 tuple2) throws Exception { + String info=tuple2._2; + //获取开始的时间 + String startTime=StringUtils.getFieldFromConcatString(info,"\\|",Constants.FIELD_START_TIME); + String formatStartTime=DateUtils.getDateHour(startTime); + return new Tuple2(formatStartTime,info); + } + }); + + //计算每一个小时的Session数量 + Map mapCount=mapDataRDD.countByKey(); + + //设计一个新的数据结构Map> dateHourCount,日期作为Key,时间和数量作为Map + Map> dateHourCountMap=new HashMap>(); + //遍历mapCount + for (Map.Entry entry:mapCount.entrySet()) + { + String date=entry.getKey().split("_")[0]; + String hour=entry.getKey().split("_")[1]; + + Map hourCount=dateHourCountMap.get(date); + if(hourCount==null) + { + hourCount=new HashMap(); + dateHourCountMap.put(date,hourCount); + } + hourCount.put(hour,(Long)entry.getValue()); + } + //将数据按照天数平均 + int countPerday=100/dateHourCountMap.size(); + //实现一个随机函数后面将会用到 + + Random random=new Random(); + //设计一个新的数据结构,用于存储随机索引,Key是每一天,Map是小时和随机索引列表构成的 + Map>> dateRandomExtractMap=new HashMap>>(); + + for (Map.Entry> dateHourCount:dateHourCountMap.entrySet()) + { + //日期 + String date=dateHourCount.getKey(); + //每一天个Session个数 + Long sessionCount=0L; + for(Map.Entry hourCountMap:dateHourCount.getValue().entrySet()) + { + sessionCount+=hourCountMap.getValue(); + } + + //获取每一天随机存储的Map + Map> dayExtactMap=dateRandomExtractMap.get(date); + if(dayExtactMap==null) + { + dayExtactMap=new HashMap>(); + dateRandomExtractMap.put(date,dayExtactMap); + } + + //遍历每一个小时,计算出每一个小时的Session占比和抽取的数量 + + for(Map.Entry hourCountMap:dateHourCount.getValue().entrySet()) + { + int extractSize= (int) ((double) hourCountMap.getValue()/sessionCount*countPerday); + + //如果抽离的长度大于被抽取数据的长度,那么抽取的长度就是被抽取长度 + extractSize= extractSize>hourCountMap.getValue()? hourCountMap.getValue().intValue():extractSize; + + //获取存储每一个小时的List + List indexList=dayExtactMap.get(hourCountMap.getKey()); + if(indexList==null) + { + indexList=new ArrayList(); + dayExtactMap.put(hourCountMap.getKey(),indexList); + } + + //使用随机函数生成随机索引 + for(int i=0;i