From 4a66781e531c201f8db6aa12d1c08f1c9139256f Mon Sep 17 00:00:00 2001 From: oeljeklaus-you Date: Sat, 23 Jun 2018 20:21:59 +0800 Subject: [PATCH] =?UTF-8?q?Top10=E5=93=81=E7=B1=BB=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/workspace.xml | 375 +++++++----------- .../java/cn/edu/hust/constant/Constants.java | 4 + .../cn/edu/hust/session/UserVisitAnalyze.java | 229 ++++++++++- 3 files changed, 374 insertions(+), 234 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 11f27ad..7705263 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,14 +2,8 @@ - - - - - - + - @@ -22,97 +16,71 @@ - + - - + + - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - - + + - - + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -158,7 +126,6 @@ @@ -279,71 +247,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -675,7 +578,7 @@ - + 1529592741848 @@ -727,7 +630,7 @@ - @@ -743,7 +646,7 @@ - + @@ -1131,8 +1034,8 @@ - - + + @@ -1165,24 +1068,12 @@ - - - - - - - - - - - - - + @@ -1254,25 +1145,15 @@ - - - - - - - - - - - - - - + + + + @@ -1336,14 +1217,6 @@ - - - - - - - - @@ -1383,95 +1256,131 @@ - + - - + + - - + - + + + + + + + + + - + - + - + - - + + + + + + + + + + - + - + - - + + - - - - - - - - - - - - - - - - - - - - - - + + - + - - + + - + - - - - - + + + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/java/cn/edu/hust/constant/Constants.java b/src/main/java/cn/edu/hust/constant/Constants.java index 037c678..4ad5a94 100644 --- a/src/main/java/cn/edu/hust/constant/Constants.java +++ b/src/main/java/cn/edu/hust/constant/Constants.java @@ -25,6 +25,10 @@ public class Constants { public static final String FIELD_VISIT_LENGTH="visitLength"; public static final String FIELD_STEP_LENGTH="stepLength"; public static final String FIELD_START_TIME="startTime"; + public static final String FIELD_CATEGORY_ID="categoryId"; + public static final String FIELD_CLICK_CATEGORY="categoryId"; + public static final String FIELD_ORDER_CATEGORY="clickCategory"; + public static final String FIELD_PAY_CATEGORY="orderCategory"; /** * Spark任务相关厂常量 diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java index 4923504..94672c8 100644 --- a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java +++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java @@ -2,7 +2,6 @@ package cn.edu.hust.session; import cn.edu.hust.conf.ConfigurationManager; import cn.edu.hust.constant.Constants; -import cn.edu.hust.dao.SessionDetailDao; import cn.edu.hust.dao.TaskDao; import cn.edu.hust.dao.factory.DaoFactory; import cn.edu.hust.domain.SessionAggrStat; @@ -101,6 +100,9 @@ public class UserVisitAnalyze { //filteredSessionRDD.count(); //计算各个session占比,并写入MySQL calculateAndPersist(sessionAggrStatAccumulator.value(),taskId); + + // + getTop10Category(filteredSessionRDD,sessionInfoPairRDD); //关闭spark上下文 context.close(); } @@ -595,4 +597,229 @@ public class UserVisitAnalyze { // 插入数据库 DaoFactory.getSessionAggrStatDao().insert(sessionAggrStat); } + + + /** + * 获取top热门品类 + * @param filteredSessionRDD + * @param sessionInfoPairRDD + */ + private static void getTop10Category(JavaPairRDD filteredSessionRDD, JavaPairRDD sessionInfoPairRDD) { + //1.获取符合条件的session梵文的所有品类 + JavaPairRDD sessionId2DetailRDD=filteredSessionRDD.join(sessionInfoPairRDD).mapToPair(new PairFunction>, String, Row>() { + @Override + public Tuple2 call(Tuple2> stringTuple2Tuple2) throws Exception { + + return new Tuple2(stringTuple2Tuple2._1,stringTuple2Tuple2._2._2); + } + }); + + + //2。获取访问的品类id,访问过表示点击,下单,支付 + JavaPairRDD categoryRDD=sessionId2DetailRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() { + @Override + public Iterable> call(Tuple2 stringRowTuple2) throws Exception { + Row row=stringRowTuple2._2; + List> visitCategoryList=new ArrayList>(); + Long clickCategoryId=row.getLong(6); + //点击品类的id + if(clickCategoryId!=null) + visitCategoryList.add(new Tuple2(clickCategoryId,clickCategoryId)); + + String[] orderCategoryIdsSplited=row.getString(8).split(","); + for (String orderCategoryId: + orderCategoryIdsSplited) { + visitCategoryList.add(new Tuple2(Long.valueOf(orderCategoryId),Long.valueOf(orderCategoryId))); + } + + String[] payCategoryIdsSplited=row.getString(10).split(","); + for (String payCategoryId: + payCategoryIdsSplited) { + visitCategoryList.add(new Tuple2(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId))); + } + return visitCategoryList; + } + }); + + //3。计算各个品类的点击,下单和支付次数 + // 3.1 计算点击品类的数量 + JavaPairRDD clickCategoryRDD = getLClickCategoryRDD(sessionId2DetailRDD); + + // 3.2 计算下单的品类的数量 + JavaPairRDD orderCategoryRDD= getOrderCategoryRDD(sessionId2DetailRDD); + + // 3.3 计算支付的品类的数量 + JavaPairRDD payCategoryRDD=getPayCategoryRDD(sessionId2DetailRDD); + + //4.将上述计算的三个字段进行join,注意这里是LeftOuterJoin,因为有些品类只是点击了 + JavaPairRDD categoryCountRDD=joinCategoryAndData(categoryRDD,clickCategoryRDD,orderCategoryRDD,payCategoryRDD); + + //5.自定义二次排序的key + + + } + + /** + * 将几个品类相连接 + * @param categoryRDD + * @param clickCategoryRDD + * @param orderCategoryRDD + * @param payCategoryRDD + * @return + */ + private static JavaPairRDD joinCategoryAndData(JavaPairRDD categoryRDD, JavaPairRDD clickCategoryRDD, JavaPairRDD orderCategoryRDD, JavaPairRDD payCategoryRDD) { + JavaPairRDD>> tmpJoinRDD=categoryRDD.leftOuterJoin(clickCategoryRDD); + + JavaPairRDD tmpRDD=tmpJoinRDD.mapToPair(new PairFunction>>, Long, String>() { + @Override + public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception { + Long categoryId=longTuple2Tuple2._1; + com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2; + Long clickCount=0L; + if(clickIOptional.isPresent()) + { + clickCount=clickIOptional.get(); + } + + String value=Constants.FIELD_CATEGORY_ID+"="+categoryId+"|"+Constants.FIELD_CLICK_CATEGORYIDS+"="+clickCount; + return new Tuple2(categoryId,value); + } + }); + //join下单的次数 + tmpRDD=tmpRDD.leftOuterJoin(orderCategoryRDD).mapToPair(new PairFunction>>, Long, String>() { + @Override + public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception { + Long categoryId=longTuple2Tuple2._1; + com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2; + Long clickCount=0L; + String value=longTuple2Tuple2._2._1; + if(clickIOptional.isPresent()) + { + clickCount=clickIOptional.get(); + } + + value=value+"|"+Constants.FIELD_ORDER_CATEGORY+"="+clickCount; + return new Tuple2(categoryId,value); + } + }); + //join支付的次数 + tmpRDD=tmpRDD.leftOuterJoin(payCategoryRDD).mapToPair(new PairFunction>>, Long, String>() { + @Override + public Tuple2 call(Tuple2>> longTuple2Tuple2) throws Exception { + Long categoryId=longTuple2Tuple2._1; + com.google.common.base.Optional clickIOptional=longTuple2Tuple2._2._2; + Long clickCount=0L; + String value=longTuple2Tuple2._2._1; + if(clickIOptional.isPresent()) + { + clickCount=clickIOptional.get(); + } + + value=value+"|"+Constants.FIELD_PAY_CATEGORY+"="+clickCount; + return new Tuple2(categoryId,value); + } + }); + return tmpRDD; + } + + private static JavaPairRDD getPayCategoryRDD(JavaPairRDD sessionId2DetailRDD) { + JavaPairRDD payActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 stringRowTuple2) throws Exception { + Row row=stringRowTuple2._2; + String categoryIds=row.getString(10); + if(categoryIds==null||"".equals(categoryIds)) return false; + return true; + } + }); + //映射成为新的Pair + JavaPairRDD payCategoryRDD=payActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() { + @Override + public Iterable> call(Tuple2 stringRowTuple2) throws Exception { + List> orderCategoryIds=new ArrayList>(); + Row row=stringRowTuple2._2; + String payCategoryIdsSplited[]=row.getString(10).split(","); + for (String payCategoryId: + payCategoryIdsSplited) { + orderCategoryIds.add(new Tuple2(Long.valueOf(payCategoryId),Long.valueOf(payCategoryId))); + } + return orderCategoryIds; + } + }); + + //计算次数 + JavaPairRDD payCategoryCountRDD=payCategoryRDD.reduceByKey(new Function2() { + @Override + public Long call(Long aLong, Long aLong2) throws Exception { + return aLong+aLong2; + } + }); + + return payCategoryCountRDD; + } + + private static JavaPairRDD getOrderCategoryRDD(JavaPairRDD sessionId2DetailRDD) { + JavaPairRDD orderActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 stringRowTuple2) throws Exception { + Row row=stringRowTuple2._2; + String categoryIds=row.getString(8); + if(categoryIds==null||"".equals(categoryIds)) return false; + return true; + } + }); + //映射成为新的Pair + JavaPairRDD orderCategoryRDD=orderActionRDD.flatMapToPair(new PairFlatMapFunction, Long, Long>() { + @Override + public Iterable> call(Tuple2 stringRowTuple2) throws Exception { + List> orderCategoryIds=new ArrayList>(); + Row row=stringRowTuple2._2; + String orderCategoryIdsSplited[]=row.getString(8).split(","); + for (String orderCategoryId: + orderCategoryIdsSplited) { + orderCategoryIds.add(new Tuple2(Long.valueOf(orderCategoryId),Long.valueOf(orderCategoryId))); + } + return orderCategoryIds; + } + }); + + //计算次数 + JavaPairRDD orderCategoryCountRDD=orderCategoryRDD.reduceByKey(new Function2() { + @Override + public Long call(Long aLong, Long aLong2) throws Exception { + return aLong+aLong2; + } + }); + + return orderCategoryCountRDD; + } + + private static JavaPairRDD getLClickCategoryRDD(JavaPairRDD sessionId2DetailRDD) { + JavaPairRDD clickActionRDD=sessionId2DetailRDD.filter(new Function, Boolean>() { + @Override + public Boolean call(Tuple2 stringRowTuple2) throws Exception { + Row row=stringRowTuple2._2; + Long categoryId=row.getLong(6); + if(categoryId==null) return false; + return true; + } + }); + //映射成为新的Pair + JavaPairRDD clickCategoryRDD=clickActionRDD.mapToPair(new PairFunction, Long,Long>() { + @Override + public Tuple2 call(Tuple2 stringRowTuple2) throws Exception { + Long row=stringRowTuple2._2.getLong(6); + return new Tuple2(row,1L); + } + }); + + //计算次数 + JavaPairRDD clickCategoryCountRDD=clickCategoryRDD.reduceByKey(new Function2() { + @Override + public Long call(Long aLong, Long aLong2) throws Exception { + return aLong+aLong2; + } + }); + return clickCategoryCountRDD; + } }