From c2e142d5953c8eb0baa3b331bafd32d5ad81f99e Mon Sep 17 00:00:00 2001 From: oeljeklaus-you Date: Sat, 23 Jun 2018 14:09:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/workspace.xml | 242 +++++++++++------- .../edu/hust/dao/SessionRandomExtractDao.java | 4 +- .../cn/edu/hust/dao/factory/DaoFactory.java | 7 + .../dao/impl/SessionRandomExtractDaoImpl.java | 14 +- .../cn/edu/hust/session/UserVisitAnalyze.java | 51 +++- 5 files changed, 225 insertions(+), 93 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index e37744c..8c3694c 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,12 +2,9 @@ - - - - - - + + + @@ -24,27 +21,78 @@ - - + + - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -99,7 +147,6 @@ @@ -225,6 +273,25 @@ + + + + + + + + + + + + + + + + + + + @@ -532,7 +599,7 @@ - + 1529592741848 @@ -584,7 +651,7 @@ - @@ -693,14 +760,6 @@ - - - - - - - - @@ -852,19 +911,6 @@ - - - - - - - - - - - - - @@ -1001,17 +1047,6 @@ - - - - - - - - - - - @@ -1258,24 +1293,6 @@ - - - - - - - - - - - - - - - - - - @@ -1305,24 +1322,75 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - - - - - - - - - - - + + + + + + + + + + + + + + + + diff --git a/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java b/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java index 265dca1..f47eaf8 100644 --- a/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java +++ b/src/main/java/cn/edu/hust/dao/SessionRandomExtractDao.java @@ -2,6 +2,8 @@ package cn.edu.hust.dao; import cn.edu.hust.domain.SessionRandomExtract; +import java.util.List; + public interface SessionRandomExtractDao { - void batchInsert(SessionRandomExtract sessionRandomExtract); + void batchInsert(List sessionRandomExtractList); } diff --git a/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java index 0066faf..43d224e 100644 --- a/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java +++ b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java @@ -1,9 +1,12 @@ package cn.edu.hust.dao.factory; import cn.edu.hust.dao.SessionAggrStatDao; +import cn.edu.hust.dao.SessionRandomExtractDao; import cn.edu.hust.dao.TaskDao; import cn.edu.hust.dao.impl.SessionAggrStatDaoImpl; +import cn.edu.hust.dao.impl.SessionRandomExtractDaoImpl; import cn.edu.hust.dao.impl.TaskDaoImpl; +import cn.edu.hust.domain.SessionRandomExtract; public class DaoFactory { /** @@ -19,4 +22,8 @@ public class DaoFactory { { return new SessionAggrStatDaoImpl(); } + + public static SessionRandomExtractDao getSessionRandomExtractDao(){ + return new SessionRandomExtractDaoImpl(); + } } diff --git a/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java b/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java index 1762533..a77fa89 100644 --- a/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java +++ b/src/main/java/cn/edu/hust/dao/impl/SessionRandomExtractDaoImpl.java @@ -4,9 +4,19 @@ import cn.edu.hust.dao.SessionRandomExtractDao; import cn.edu.hust.domain.SessionRandomExtract; import cn.edu.hust.jdbc.JDBCHelper; +import java.util.ArrayList; +import java.util.List; + public class SessionRandomExtractDaoImpl implements SessionRandomExtractDao{ @Override - public void batchInsert(SessionRandomExtract sessionRandomExtract) { - //JDBCHelper.getInstance(). + public void batchInsert(List sessionRandomExtractList) { + String sql="insert into session_random_extract values(?,?,?,?,?)"; + List paramList=new ArrayList(); + for (SessionRandomExtract sessionRandomExtract:sessionRandomExtractList) { + Object[] params=new Object[]{sessionRandomExtract.getTaskId(),sessionRandomExtract.getSessionId() + ,sessionRandomExtract.getStartTime(),sessionRandomExtract.getSearchKeyWords(),sessionRandomExtract.getClick_category_ids()}; + paramList.add(params); + } + JDBCHelper.getInstance().excuteBatch(sql,paramList); } } diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java index c8f7f11..2b0c773 100644 --- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java +++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java @@ -5,6 +5,7 @@ import cn.edu.hust.constant.Constants; import cn.edu.hust.dao.TaskDao; import cn.edu.hust.dao.factory.DaoFactory; import cn.edu.hust.domain.SessionAggrStat; +import cn.edu.hust.domain.SessionRandomExtract; import cn.edu.hust.domain.Task; import cn.edu.hust.mockData.MockData; import cn.edu.hust.util.*; @@ -16,7 +17,9 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; @@ -95,7 +98,7 @@ public class UserVisitAnalyze { /** * 使用CountByKey算子实现随机抽取功能 */ - randomExtractSession(filteredSessionRDD); + randomExtractSession(taskId,filteredSessionRDD); //计算各个session占比,并写入MySQL calculateAndPersist(sessionAggrStatAccumulator.value(),taskId); //关闭spark上下文 @@ -361,9 +364,10 @@ public class UserVisitAnalyze { /** * 随机抽取Sesison功能 + * @param taskId * @param filteredSessionRDD */ - private static void randomExtractSession(JavaPairRDD filteredSessionRDD) { + private static void randomExtractSession(final Long taskId, JavaPairRDD filteredSessionRDD) { //1.先将过滤Seesion进行映射,映射成为Time,Info的数据格式 final JavaPairRDD mapDataRDD=filteredSessionRDD.mapToPair(new PairFunction, String, String>() { @Override @@ -401,7 +405,7 @@ public class UserVisitAnalyze { Random random=new Random(); //设计一个新的数据结构,用于存储随机索引,Key是每一天,Map是小时和随机索引列表构成的 - Map>> dateRandomExtractMap=new HashMap>>(); + final Map>> dateRandomExtractMap=new HashMap>>(); for (Map.Entry> dateHourCount:dateHourCountMap.entrySet()) { @@ -451,6 +455,47 @@ public class UserVisitAnalyze { } } + + //2.将上面计算的RDD进行分组,然后使用FlatMap进行压平,然后判断是否在索引中,如果在,那么将这个信息持久化 + JavaPairRDD> time2GroupRDD=mapDataRDD.groupByKey(); + //将抽取的信息持久化到数据库,并返回SessionIds对,然后和以前的信息Join + JavaPairRDD sessionIds= time2GroupRDD.flatMapToPair(new PairFlatMapFunction>, String, String>() { + @Override + public Iterable> call(Tuple2> tuple2) throws Exception { + String dateStr=tuple2._1; + String date=dateStr.split("_")[0]; + String hour=dateStr.split("_")[1]; + //使用一个List存储sessionId + List> sessionIds=new ArrayList>(); + List indexList=dateRandomExtractMap.get(date).get(hour); + //使用一个list保存需要持久化到数据库的对象 + List sessionRandomExtractList=new ArrayList(); + int index=0; + for (String infos: + tuple2._2) { + if(indexList.contains(index)) + { + //构建SessionRandomExtract + SessionRandomExtract sessionRandomExtract=new SessionRandomExtract(); + final String sessionId=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SESSIONID); + String startTime=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_START_TIME); + String searchKeyWards=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_SERACH_KEYWORDS); + String clickCategoryIds=StringUtils.getFieldFromConcatString(infos,"\\|",Constants.FIELD_CLICK_CATEGORYIDS); + sessionRandomExtract.set(taskId,sessionId,startTime,searchKeyWards,clickCategoryIds); + //添加到List中然后持久化到数据库中 + sessionRandomExtractList.add(sessionRandomExtract); + sessionIds.add(new Tuple2(sessionId,sessionId)); + } + index++; + } + //持久化到数据库 + DaoFactory.getSessionRandomExtractDao().batchInsert(sessionRandomExtractList); + return sessionIds; + } + }); + + //3. 获取session的明细数据保存到数据库 + } //计算各个范围的占比,并持久化到数据库