diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 8c3694c..2dce933 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,9 +2,11 @@ - + + + - + @@ -21,84 +23,139 @@ - - + + - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + - - + + - - + + - + + + + - - + + - - + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - - - - - + + + - - + + - - + + + + - - + + - - - + + + + + @@ -154,9 +211,12 @@ - @@ -760,20 +839,6 @@ - - - - - - - - - - - - - - @@ -1047,16 +1112,6 @@ - - - - - - - - - - @@ -1236,63 +1291,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -1338,13 +1336,12 @@ - + - - + + - - + @@ -1359,38 +1356,130 @@ - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/java/cn/edu/hust/dao/SessionDetailDao.java b/src/main/java/cn/edu/hust/dao/SessionDetailDao.java new file mode 100644 index 0000000..5eabddd --- /dev/null +++ b/src/main/java/cn/edu/hust/dao/SessionDetailDao.java @@ -0,0 +1,7 @@ +package cn.edu.hust.dao; + +import cn.edu.hust.domain.SessionDetail; + +public interface SessionDetailDao { + void insert(SessionDetail sessionDetail); +} diff --git a/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java index 43d224e..08d6a41 100644 --- a/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java +++ b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java @@ -1,11 +1,14 @@ package cn.edu.hust.dao.factory; import cn.edu.hust.dao.SessionAggrStatDao; +import cn.edu.hust.dao.SessionDetailDao; import cn.edu.hust.dao.SessionRandomExtractDao; import cn.edu.hust.dao.TaskDao; import cn.edu.hust.dao.impl.SessionAggrStatDaoImpl; +import cn.edu.hust.dao.impl.SessionDetailDaoImpl; import cn.edu.hust.dao.impl.SessionRandomExtractDaoImpl; import cn.edu.hust.dao.impl.TaskDaoImpl; +import cn.edu.hust.domain.SessionDetail; import cn.edu.hust.domain.SessionRandomExtract; public class DaoFactory { @@ -26,4 +29,9 @@ public class DaoFactory { public static SessionRandomExtractDao getSessionRandomExtractDao(){ return new SessionRandomExtractDaoImpl(); } + + public static SessionDetailDao getSessionDetailDao() + { + return new SessionDetailDaoImpl(); + } } diff --git a/src/main/java/cn/edu/hust/dao/impl/SessionDetailDaoImpl.java b/src/main/java/cn/edu/hust/dao/impl/SessionDetailDaoImpl.java new file mode 100644 index 0000000..f476814 --- /dev/null +++ b/src/main/java/cn/edu/hust/dao/impl/SessionDetailDaoImpl.java @@ -0,0 +1,17 @@ +package cn.edu.hust.dao.impl; + +import cn.edu.hust.dao.SessionDetailDao; +import cn.edu.hust.domain.SessionDetail; +import cn.edu.hust.jdbc.JDBCHelper; + +public class SessionDetailDaoImpl implements SessionDetailDao{ + @Override + public void insert(SessionDetail sessionDetail) { + String sql="insert into session_detail values(?,?,?,?,?,?,?,?,?,?,?,?)"; + Object[] object=new Object[]{sessionDetail.getTaskId(),sessionDetail.getUserId(), + sessionDetail.getSessinId(),sessionDetail.getPageid(),sessionDetail.getActionTime(), + sessionDetail.getSearchKeyWord(),sessionDetail.getClickCategoryId(),sessionDetail.getClickProductId() + ,sessionDetail.getOrderCategoryIds(),sessionDetail.getOrderProductIds(),sessionDetail.getPayCategoryIds(),sessionDetail.getPayProductIds()}; + JDBCHelper.getInstance().excuteUpdate(sql,object); + } +} diff --git a/src/main/java/cn/edu/hust/domain/SessionDetail.java b/src/main/java/cn/edu/hust/domain/SessionDetail.java index 7ed37ae..e438d25 100644 --- a/src/main/java/cn/edu/hust/domain/SessionDetail.java +++ b/src/main/java/cn/edu/hust/domain/SessionDetail.java @@ -19,7 +19,7 @@ public class SessionDetail implements Serializable{ public SessionDetail() { } - public SessionDetail(Long taskId, Long userId, String sessinId, Long pageid, String actionTime, String searchKeyWord, Long clickCategoryId, Long clickProductId, String orderCategoryIds, String orderProductIds, String payCategoryIds, String payProductIds) { + public void set(Long taskId, Long userId, String sessinId, Long pageid, String actionTime, String searchKeyWord, Long clickCategoryId, Long clickProductId, String orderCategoryIds, String orderProductIds, String payCategoryIds, String payProductIds) { this.taskId = taskId; this.userId = userId; this.sessinId = sessinId; diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java index 2b0c773..6001b6b 100644 --- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java +++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java @@ -2,31 +2,27 @@ package cn.edu.hust.session; import cn.edu.hust.conf.ConfigurationManager; import cn.edu.hust.constant.Constants; +import cn.edu.hust.dao.SessionDetailDao; import cn.edu.hust.dao.TaskDao; import cn.edu.hust.dao.factory.DaoFactory; import cn.edu.hust.domain.SessionAggrStat; +import cn.edu.hust.domain.SessionDetail; import cn.edu.hust.domain.SessionRandomExtract; import cn.edu.hust.domain.Task; import cn.edu.hust.mockData.MockData; import cn.edu.hust.util.*; import com.alibaba.fastjson.JSONObject; import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.*; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.catalyst.expressions.Rand; import org.apache.spark.sql.hive.HiveContext; -import org.joda.time.DateTime; import scala.Tuple2; import java.util.*; @@ -63,6 +59,8 @@ public class UserVisitAnalyze { //获取指定范围内的Sesssion JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject); + //这里增加一个新的方法,主要是映射 + JavaPairRDD sessionInfoPairRDD=getSessonInfoPairRDD(sessionRangeDate); //按照Sesson进行聚合 JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionRangeDate); @@ -98,7 +96,7 @@ public class UserVisitAnalyze { /** * 使用CountByKey算子实现随机抽取功能 */ - randomExtractSession(taskId,filteredSessionRDD); + randomExtractSession(taskId,filteredSessionRDD,sessionInfoPairRDD); //计算各个session占比,并写入MySQL calculateAndPersist(sessionAggrStatAccumulator.value(),taskId); //关闭spark上下文 @@ -147,6 +145,26 @@ public class UserVisitAnalyze { return df.javaRDD(); } + /** + * 将数据进行映射成为Pair,键为SessionId,Value为Row + * @param sessionRangeDate + * @return + */ + private static JavaPairRDD getSessonInfoPairRDD(JavaRDD sessionRangeDate) { + return sessionRangeDate.mapToPair(new PairFunction() { + @Override + public Tuple2 call(Row row) throws Exception { + return new Tuple2(row.getString(2),row); + } + }); + } + + /** + * session粒度的聚合 + * @param sc + * @param sessionRangeDate + * @return + */ private static JavaPairRDD aggregateBySessionId(SQLContext sc, JavaRDD sessionRangeDate) { /** * 先将数据映射成map格式 @@ -366,8 +384,9 @@ public class UserVisitAnalyze { * 随机抽取Sesison功能 * @param taskId * @param filteredSessionRDD + * @param sessionInfoPairRDD */ - private static void randomExtractSession(final Long taskId, JavaPairRDD filteredSessionRDD) { + private static void randomExtractSession(final Long taskId, JavaPairRDD filteredSessionRDD, JavaPairRDD sessionInfoPairRDD) { //1.先将过滤Seesion进行映射,映射成为Time,Info的数据格式 final JavaPairRDD mapDataRDD=filteredSessionRDD.mapToPair(new PairFunction, String, String>() { @Override @@ -495,7 +514,33 @@ public class UserVisitAnalyze { }); //3. 获取session的明细数据保存到数据库 - + JavaPairRDD> sessionDetailRDD= sessionIds.join(sessionInfoPairRDD); + final SessionDetailDao sessionDetailDao=DaoFactory.getSessionDetailDao(); + sessionDetailRDD.foreachPartition(new VoidFunction>>>() { + @Override + public void call(Iterator>> tuple2Iterator) throws Exception { + while(tuple2Iterator.hasNext()) + { + Tuple2> tuple2=tuple2Iterator.next(); + Row row=tuple2._2._2; + String sessionId=tuple2._1; + Long userId=row.getLong(1); + Long pageId=row.getLong(3); + String actionTime=row.getString(4); + String searchKeyWard=row.getString(5); + Long clickCategoryId=row.getLong(6); + Long clickProducetId=row.getLong(7); + String orderCategoryId=row.getString(8); + String orderProducetId=row.getString(9); + String payCategoryId=row.getString(10); + String payProducetId=row.getString(11); + SessionDetail sessionDetail=new SessionDetail(); + sessionDetail.set(taskId,userId,sessionId,pageId,actionTime,searchKeyWard,clickCategoryId,clickProducetId,orderCategoryId,orderProducetId,payCategoryId,payProducetId); + sessionDetailDao.insert(sessionDetail); + } + } + }); + } //计算各个范围的占比,并持久化到数据库