diff --git a/.idea/libraries/Maven__joda_time_joda_time_2_9.xml b/.idea/libraries/Maven__joda_time_joda_time_2_5.xml
similarity index 67%
rename from .idea/libraries/Maven__joda_time_joda_time_2_9.xml
rename to .idea/libraries/Maven__joda_time_joda_time_2_5.xml
index bb6a71f..7eb240e 100644
--- a/.idea/libraries/Maven__joda_time_joda_time_2_9.xml
+++ b/.idea/libraries/Maven__joda_time_joda_time_2_5.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__net_razorvine_pyrolite_4_9.xml b/.idea/libraries/Maven__net_razorvine_pyrolite_4_4.xml
similarity index 66%
rename from .idea/libraries/Maven__net_razorvine_pyrolite_4_9.xml
rename to .idea/libraries/Maven__net_razorvine_pyrolite_4_4.xml
index 1256ca5..84ff505 100644
--- a/.idea/libraries/Maven__net_razorvine_pyrolite_4_9.xml
+++ b/.idea/libraries/Maven__net_razorvine_pyrolite_4_4.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__net_sf_py4j_py4j_0_8_2_1.xml b/.idea/libraries/Maven__net_sf_py4j_py4j_0_8_2_1.xml
new file mode 100644
index 0000000..b771ebc
--- /dev/null
+++ b/.idea/libraries/Maven__net_sf_py4j_py4j_0_8_2_1.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__net_sf_py4j_py4j_0_9.xml b/.idea/libraries/Maven__net_sf_py4j_py4j_0_9.xml
deleted file mode 100644
index 67ed84d..0000000
--- a/.idea/libraries/Maven__net_sf_py4j_py4j_0_9.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_curator_curator_client_2_6_0.xml b/.idea/libraries/Maven__org_apache_curator_curator_client_2_1_0_incubating.xml
similarity index 50%
rename from .idea/libraries/Maven__org_apache_curator_curator_client_2_6_0.xml
rename to .idea/libraries/Maven__org_apache_curator_curator_client_2_1_0_incubating.xml
index 326a531..f32bd7d 100644
--- a/.idea/libraries/Maven__org_apache_curator_curator_client_2_6_0.xml
+++ b/.idea/libraries/Maven__org_apache_curator_curator_client_2_1_0_incubating.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_5_1.xml
similarity index 55%
rename from .idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_5_1.xml
index d175fef..b157e29 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_catalyst_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_5_1.xml
similarity index 57%
rename from .idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_5_1.xml
index 7fc7b73..f078dfd 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_core_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_5_1.xml
similarity index 57%
rename from .idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_5_1.xml
index b925330..d8dc75e 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_hive_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_5_1.xml
similarity index 55%
rename from .idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_5_1.xml
index da6bb77..0c6463e 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_launcher_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_5_1.xml
similarity index 66%
rename from .idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_5_1.xml
index ea0754c..15537c9 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_network_common_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_5_1.xml
similarity index 66%
rename from .idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_5_1.xml
index f9f01e6..c7748f6 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_network_shuffle_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_5_1.xml
similarity index 58%
rename from .idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_5_1.xml
index d5fdaee..05a746e 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_sql_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_5_1.xml
similarity index 65%
rename from .idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_5_1.xml
index 881bae0..820272a 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_streaming_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_6_1.xml b/.idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_5_1.xml
similarity index 56%
rename from .idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_6_1.xml
rename to .idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_5_1.xml
index f0cc447..b60a3c0 100644
--- a/.idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_6_1.xml
+++ b/.idea/libraries/Maven__org_apache_spark_spark_unsafe_2_10_1_5_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_apache_xbean_xbean_asm5_shaded_4_4.xml b/.idea/libraries/Maven__org_apache_xbean_xbean_asm5_shaded_4_4.xml
deleted file mode 100644
index 9b913be..0000000
--- a/.idea/libraries/Maven__org_apache_xbean_xbean_asm5_shaded_4_4.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_5_11.xml b/.idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_4_5.xml
similarity index 57%
rename from .idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_5_11.xml
rename to .idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_4_5.xml
index 9c200b9..171c39e 100644
--- a/.idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_5_11.xml
+++ b/.idea/libraries/Maven__org_roaringbitmap_RoaringBitmap_0_4_5.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_scala_lang_scala_library_2_10_5.xml b/.idea/libraries/Maven__org_scala_lang_scala_library_2_10_4.xml
similarity index 59%
rename from .idea/libraries/Maven__org_scala_lang_scala_library_2_10_5.xml
rename to .idea/libraries/Maven__org_scala_lang_scala_library_2_10_4.xml
index 8d53435..f159907 100644
--- a/.idea/libraries/Maven__org_scala_lang_scala_library_2_10_5.xml
+++ b/.idea/libraries/Maven__org_scala_lang_scala_library_2_10_4.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_spark_project_hive_hive_cli_1_2_1_spark.xml b/.idea/libraries/Maven__org_spark_project_hive_hive_cli_1_2_1_spark.xml
deleted file mode 100644
index 74af790..0000000
--- a/.idea/libraries/Maven__org_spark_project_hive_hive_cli_1_2_1_spark.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_tachyonproject_tachyon_client_0_8_2.xml b/.idea/libraries/Maven__org_tachyonproject_tachyon_client_0_7_1.xml
similarity index 57%
rename from .idea/libraries/Maven__org_tachyonproject_tachyon_client_0_8_2.xml
rename to .idea/libraries/Maven__org_tachyonproject_tachyon_client_0_7_1.xml
index da6af3b..e14969f 100644
--- a/.idea/libraries/Maven__org_tachyonproject_tachyon_client_0_8_2.xml
+++ b/.idea/libraries/Maven__org_tachyonproject_tachyon_client_0_7_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_8_2.xml b/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_7_1.xml
similarity index 65%
rename from .idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_8_2.xml
rename to .idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_7_1.xml
index 4aafe1f..cd16342 100644
--- a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_8_2.xml
+++ b/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_hdfs_0_7_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_8_2.xml b/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_7_1.xml
similarity index 65%
rename from .idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_8_2.xml
rename to .idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_7_1.xml
index 745e087..845d2cb 100644
--- a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_8_2.xml
+++ b/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_local_0_7_1.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_s3_0_8_2.xml b/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_s3_0_8_2.xml
deleted file mode 100644
index e6e68fd..0000000
--- a/.idea/libraries/Maven__org_tachyonproject_tachyon_underfs_s3_0_8_2.xml
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_2.xml b/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_1_7.xml
similarity index 58%
rename from .idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_2.xml
rename to .idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_1_7.xml
index a93559a..61c0f2b 100644
--- a/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_2.xml
+++ b/.idea/libraries/Maven__org_xerial_snappy_snappy_java_1_1_1_7.xml
@@ -1,13 +1,13 @@
-
+
-
+
-
+
-
+
\ No newline at end of file
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 66a2579..f1dce1c 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -2,7 +2,52 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -14,36 +59,42 @@
-
-
-
+
+
+
-
-
+
+
-
-
+
+
+
+
+
+
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
@@ -54,6 +105,7 @@
@@ -70,13 +122,30 @@
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
@@ -102,10 +171,9 @@
-
-
+
-
+
@@ -149,7 +217,6 @@
-
@@ -172,28 +239,50 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
-
+
-
+
+
+
@@ -206,9 +295,14 @@
+
+
+
+
+
@@ -225,7 +319,7 @@
-
+
@@ -233,6 +327,25 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -247,19 +360,94 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -332,9 +520,20 @@
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
@@ -353,6 +552,8 @@
1529588420966
+
+
1529592741848
@@ -372,39 +573,66 @@
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -412,6 +640,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -450,54 +708,122 @@
-
+
+
+
-
+
-
-
+
+
+
-
+
-
-
+
+
+
-
+
-
-
+
+
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
+
+
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -505,7 +831,6 @@
-
@@ -513,7 +838,6 @@
-
@@ -521,41 +845,268 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/README.md b/README.md
index 0c8134e..ab1e746 100644
--- a/README.md
+++ b/README.md
@@ -327,3 +327,6 @@ task_param:最最重要,用来使用JSON的格式,来封装用户提交的
完成了数据调研、需求分析、技术方案设计、数据设计以后,正式进入编码实现和功能测试阶段。最后才是性能调优阶段。
+## 用户访问Session的比较高端技术
+### 自定义Accumulator
+ 使用自定义Accumulator降低维护成本,一个就可以搞定很多业务需求
\ No newline at end of file
diff --git a/UserActionAnalyzePlatform.iml b/UserActionAnalyzePlatform.iml
index 8c28597..6e9857d 100644
--- a/UserActionAnalyzePlatform.iml
+++ b/UserActionAnalyzePlatform.iml
@@ -12,7 +12,7 @@
-
+
@@ -23,12 +23,10 @@
-
-
-
-
-
-
+
+
+
+
@@ -44,9 +42,9 @@
-
+
-
+
@@ -55,7 +53,7 @@
-
+
@@ -76,17 +74,17 @@
-
+
-
-
-
-
-
+
+
+
+
+
-
-
+
+
@@ -95,11 +93,8 @@
-
+
-
-
-
@@ -114,8 +109,10 @@
+
+
@@ -135,12 +132,12 @@
-
+
-
+
@@ -155,7 +152,6 @@
-
@@ -166,6 +162,7 @@
+
diff --git a/pom.xml b/pom.xml
index 276f6bd..9b55150 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,22 +16,22 @@
org.apache.spark
spark-core_2.10
- 1.6.1
+ 1.5.1
org.apache.spark
spark-sql_2.10
- 1.6.1
+ 1.5.1
org.apache.spark
spark-hive_2.10
- 1.6.1
+ 1.5.1
org.apache.spark
spark-streaming_2.10
- 1.6.1
+ 1.5.1
org.apache.hadoop
diff --git a/src/main/java/cn/edu/hust/conf/ConfigurationManager.java b/src/main/java/cn/edu/hust/conf/ConfigurationManager.java
index 355ca0d..dbae85b 100644
--- a/src/main/java/cn/edu/hust/conf/ConfigurationManager.java
+++ b/src/main/java/cn/edu/hust/conf/ConfigurationManager.java
@@ -35,4 +35,41 @@ public class ConfigurationManager {
{
return prop.getProperty(key);
}
+
+ /**
+ * 获取整数变量
+ * @param key
+ * @return
+ */
+ public static Integer getInteger(String key)
+ {
+ String value=getProperty(key);
+ try
+ {
+ Integer result=Integer.valueOf(value);
+ return result;
+ }
+ catch (Exception e)
+ {
+
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ /**
+ * 获取布尔型
+ * @param key
+ * @return
+ */
+ public static Boolean getBoolean(String key)
+ {
+ String value=getProperty(key);
+ if("false".equals(value))
+ {
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/src/main/java/cn/edu/hust/constant/Constants.java b/src/main/java/cn/edu/hust/constant/Constants.java
new file mode 100644
index 0000000..361b9b5
--- /dev/null
+++ b/src/main/java/cn/edu/hust/constant/Constants.java
@@ -0,0 +1,59 @@
+package cn.edu.hust.constant;
+
+public class Constants {
+ /**
+ * 项目配置常量
+ */
+ public static final String JDBC_DRIVER="jdbc.driver";
+ public static final String JDBC_URL="jdbc.url";
+ public static final String JDBC_USERNAME="jdbc.username";
+ public static final String JDBC_PSSWORD="jdbc.password";
+ public static final String JDBC_ACTIVE="jdbc.active";
+
+ /**
+ * Spark作业相关常量
+ */
+ public static final String APP_NAME_SESSION="UserVisitAnalyze";
+ public static final String SPARK_LOCAL="spark_local";
+ public static final String FIELD_SESSIONID="sessionId";
+ public static final String FIELD_SERACH_KEYWORDS="searchKeywords";
+ public static final String FIELD_CLICK_CATEGORYIDS="clickCategoryIds";
+ public static final String FIELD_AGE="age";
+ public static final String FIELD_CITY="city";
+ public static final String FIELD_SEX="sex";
+ public static final String FIELD_PROFESSIONAL="professional";
+ public static final String FIELD_VISIT_LENGTH="visitLength";
+ public static final String FIELD_STEP_LENGTH="stepLength";
+
+ /**
+ * Spark任务相关厂常量
+ */
+ public static final String PARAM_STARTTIME ="startDate";
+ public static final String PARAM_ENDTIME ="endDate";
+ public static final String PARAM_STARTAGE ="startAge";
+ public static final String PARAM_ENDAGE ="endAge";
+ public static final String PARAM_PROFESSONALS ="professionals";
+ public static final String PARAM_CIYTIES ="cities";
+ public static final String PARAM_SEX ="sex";
+ public static final String PARAM_SERACH_KEYWORDS="searchKeywords";
+ public static final String PARAM_CLICK_CATEGORYIDS="clickCategoryIds";
+
+ public static final String SESSION_COUNT = "session_count";
+ public static final String TIME_PERIOD_1s_3s = "1s_3s";
+ public static final String TIME_PERIOD_4s_6s = "4s_6s";
+ public static final String TIME_PERIOD_7s_9s = "7s_9s";
+ public static final String TIME_PERIOD_10s_30s = "10s_30s";
+ public static final String TIME_PERIOD_30s_60s = "30s_60s";
+ public static final String TIME_PERIOD_1m_3m = "1m_3m";
+ public static final String TIME_PERIOD_3m_10m = "3m_10m";
+ public static final String TIME_PERIOD_10m_30m = "10m_30m";
+ public static final String TIME_PERIOD_30m = "30m";
+
+ public static final String STEP_PERIOD_1_3 = "1_3";
+ public static final String STEP_PERIOD_4_6 = "4_6";
+ public static final String STEP_PERIOD_7_9 = "7_9";
+ public static final String STEP_PERIOD_10_30 = "10_30";
+ public static final String STEP_PERIOD_30_60 = "30_60";
+ public static final String STEP_PERIOD_60 = "60";
+
+}
diff --git a/src/main/java/cn/edu/hust/dao/TaskDao.java b/src/main/java/cn/edu/hust/dao/TaskDao.java
new file mode 100644
index 0000000..b96ec25
--- /dev/null
+++ b/src/main/java/cn/edu/hust/dao/TaskDao.java
@@ -0,0 +1,7 @@
+package cn.edu.hust.dao;
+
+import cn.edu.hust.domain.Task;
+
+public interface TaskDao {
+ Task findTaskById(Long id);
+}
diff --git a/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java
new file mode 100644
index 0000000..dc4cade
--- /dev/null
+++ b/src/main/java/cn/edu/hust/dao/factory/DaoFactory.java
@@ -0,0 +1,15 @@
+package cn.edu.hust.dao.factory;
+
+import cn.edu.hust.dao.TaskDao;
+import cn.edu.hust.dao.impl.TaskDaoImpl;
+
+public class DaoFactory {
+ /**
+ * 使用工厂模式
+ * @return
+ */
+ public static TaskDao getTaskDao()
+ {
+ return new TaskDaoImpl();
+ }
+}
diff --git a/src/main/java/cn/edu/hust/dao/impl/TaskDaoImpl.java b/src/main/java/cn/edu/hust/dao/impl/TaskDaoImpl.java
new file mode 100644
index 0000000..f8623d7
--- /dev/null
+++ b/src/main/java/cn/edu/hust/dao/impl/TaskDaoImpl.java
@@ -0,0 +1,38 @@
+package cn.edu.hust.dao.impl;
+
+import cn.edu.hust.dao.TaskDao;
+import cn.edu.hust.domain.Task;
+import cn.edu.hust.jdbc.JDBCHelper;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class TaskDaoImpl implements TaskDao{
+ @Override
+ public Task findTaskById(Long id) {
+ String sql="select * from task where task_id=?";
+ final Task task=new Task();
+ JDBCHelper.getInstance().excuteQuery(sql, new Object[]{id}, new JDBCHelper.QueryCallBack() {
+ @Override
+ public void process(ResultSet rs) {
+ try {
+ if(rs.next())
+ {
+ Long id=rs.getLong(1);
+ String taskName=rs.getString(2);
+ String createTime=rs.getString(3);
+ String startTime=rs.getString(4);
+ String finishTime=rs.getString(5);
+ String taskType=rs.getString(6);
+ String taskStatus=rs.getString(7);
+ String taskParam=rs.getString(8);
+ task.set(id,taskName,createTime,startTime,finishTime,taskType,taskStatus,taskParam);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ return task;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/demo/Singleton.java b/src/main/java/cn/edu/hust/demo/Singleton.java
new file mode 100644
index 0000000..900bb7a
--- /dev/null
+++ b/src/main/java/cn/edu/hust/demo/Singleton.java
@@ -0,0 +1,59 @@
+package cn.edu.hust.demo;
+
+/**
+ * 单例模式几种实现方式
+ */
+public class Singleton {
+
+ /**
+ * 饿汉模式实现单例模式,线程安全
+ */
+ /**
+ private static Singleton instance=new Singleton();
+ private Singleton()
+ {
+
+ }
+ public static Singleton getInstance()
+ {
+ return instance;
+ }*/
+
+ /**
+ * 懒汉模式实现单例模式 线程不安全的写法
+
+ private static Singleton instance=null;
+ private Singleton()
+ {
+
+ }
+ public static Singleton getInstance()
+ {
+ if(instance==null) instance=new Singleton();
+ return instance;
+ }*/
+
+ /**
+ * 懒汉模式线程安全的写法
+ */
+ private static Singleton instance=null;
+ private Singleton()
+ {
+
+ }
+ public static Singleton getInstance()
+ {
+ if(instance==null)
+ {
+ synchronized (Singleton.class)
+ {
+ if(instance==null)
+ {
+ instance=new Singleton();
+ }
+ }
+
+ }
+ return instance;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/domain/Task.java b/src/main/java/cn/edu/hust/domain/Task.java
new file mode 100644
index 0000000..bef5826
--- /dev/null
+++ b/src/main/java/cn/edu/hust/domain/Task.java
@@ -0,0 +1,92 @@
+package cn.edu.hust.domain;
+
+import java.io.Serializable;
+
+public class Task implements Serializable{
+ private Long taskId;
+ private String taskName;
+ private String createTime;
+ private String startTime;
+ private String finishTime;
+ private String taskType;
+ private String taskStatus;
+ private String taskParam;
+
+ public Task() {
+ }
+
+ public void set(Long taskId, String taskName, String createTime, String startTime, String finishTime, String taskType, String taskStatus, String taskParam) {
+ this.taskId = taskId;
+ this.taskName = taskName;
+ this.createTime = createTime;
+ this.startTime = startTime;
+ this.finishTime = finishTime;
+ this.taskType = taskType;
+ this.taskStatus = taskStatus;
+ this.taskParam = taskParam;
+ }
+
+ public Long getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(Long taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
+ }
+
+ public String getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(String createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(String startTime) {
+ this.startTime = startTime;
+ }
+
+ public String getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(String finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(String taskType) {
+ this.taskType = taskType;
+ }
+
+ public String getTaskStatus() {
+ return taskStatus;
+ }
+
+ public void setTaskStatus(String taskStatus) {
+ this.taskStatus = taskStatus;
+ }
+
+ public String getTaskParam() {
+ return taskParam;
+ }
+
+ public void setTaskParam(String taskParam) {
+ this.taskParam = taskParam;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/jdbc/JDBCHelper.java b/src/main/java/cn/edu/hust/jdbc/JDBCHelper.java
new file mode 100644
index 0000000..c414e52
--- /dev/null
+++ b/src/main/java/cn/edu/hust/jdbc/JDBCHelper.java
@@ -0,0 +1,195 @@
+package cn.edu.hust.jdbc;
+
+import cn.edu.hust.conf.ConfigurationManager;
+import cn.edu.hust.constant.Constants;
+
+import java.sql.*;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class JDBCHelper {
+ private static JDBCHelper instance=new JDBCHelper();
+ //使用阻塞队列
+ private LinkedBlockingQueue queue=new LinkedBlockingQueue();
+ static{
+ try {
+ Class.forName(ConfigurationManager.getProperty(Constants.JDBC_DRIVER));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ /**
+ * 在构造函数创建数据库连接池
+ * 结合单例模式,确保数据库连接池单例
+ */
+ private JDBCHelper(){
+ int dataSourceSize=ConfigurationManager.getInteger(Constants.JDBC_ACTIVE);
+ String url=ConfigurationManager.getProperty(Constants.JDBC_URL);
+ String username=ConfigurationManager.getProperty(Constants.JDBC_USERNAME);
+ String passward=ConfigurationManager.getProperty(Constants.JDBC_PSSWORD);
+ try
+ {
+ for(int i=0;i params)
+ {
+ Connection connection=null;
+ PreparedStatement statement=null;
+ int[] res=null;
+ try
+ {
+ connection=getConnection();
+ statement=connection.prepareStatement(sql);
+ //1.取消自动提交
+ connection.setAutoCommit(false);
+ //2.设置参数
+ for (Object[] param:
+ params) {
+ for (int i = 0; i < param.length; i++) {
+ statement.setObject(i+1,param[i]);
+ }
+ statement.addBatch();
+ }
+ //3.批量执行
+ res=statement.executeBatch();
+ //4.最后一步提交
+ connection.commit();
+ return res;
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ finally {
+ if(connection!=null)
+ {
+ try {
+ queue.put(connection);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return res;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/mockData/MockData.java b/src/main/java/cn/edu/hust/mockData/MockData.java
new file mode 100644
index 0000000..f6d6387
--- /dev/null
+++ b/src/main/java/cn/edu/hust/mockData/MockData.java
@@ -0,0 +1,147 @@
+package cn.edu.hust.mockData;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import cn.edu.hust.util.DateUtils;
+import cn.edu.hust.util.StringUtils;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+
+
+/**
+ * 模拟数据程序
+ * @author Administrator
+ *
+ */
+public class MockData {
+
+ /**
+ * 弄你数据
+ * @param sc
+ * @param sqlContext
+ */
+ public static void mock(JavaSparkContext sc,
+ SQLContext sqlContext) {
+ List rows = new ArrayList();
+
+ String[] searchKeywords = new String[] {"火锅", "蛋糕", "重庆辣子鸡", "重庆小面",
+ "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"};
+ String date = DateUtils.getTodayDate();
+ String[] actions = new String[]{"search", "click", "order", "pay"};
+ Random random = new Random();
+
+ for(int i = 0; i < 100; i++) {
+ long userid = random.nextInt(100);
+
+ for(int j = 0; j < 10; j++) {
+ String sessionid = UUID.randomUUID().toString().replace("-", "");
+ String baseActionTime = date + " " + random.nextInt(23);
+
+ for(int k = 0; k < random.nextInt(100); k++) {
+ long pageid = random.nextInt(10);
+ String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)));
+ String searchKeyword = null;
+ Long clickCategoryId = null;
+ Long clickProductId = null;
+ String orderCategoryIds = null;
+ String orderProductIds = null;
+ String payCategoryIds = null;
+ String payProductIds = null;
+
+ String action = actions[random.nextInt(4)];
+ if("search".equals(action)) {
+ searchKeyword = searchKeywords[random.nextInt(10)];
+ } else if("click".equals(action)) {
+ clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100)));
+ clickProductId = Long.valueOf(String.valueOf(random.nextInt(100)));
+ } else if("order".equals(action)) {
+ orderCategoryIds = String.valueOf(random.nextInt(100));
+ orderProductIds = String.valueOf(random.nextInt(100));
+ } else if("pay".equals(action)) {
+ payCategoryIds = String.valueOf(random.nextInt(100));
+ payProductIds = String.valueOf(random.nextInt(100));
+ }
+
+ Row row = RowFactory.create(date, userid, sessionid,
+ pageid, actionTime, searchKeyword,
+ clickCategoryId, clickProductId,
+ orderCategoryIds, orderProductIds,
+ payCategoryIds, payProductIds);
+ rows.add(row);
+ }
+ }
+ }
+
+ JavaRDD rowsRDD = sc.parallelize(rows);
+
+ StructType schema = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("date", DataTypes.StringType, true),
+ DataTypes.createStructField("user_id", DataTypes.LongType, true),
+ DataTypes.createStructField("session_id", DataTypes.StringType, true),
+ DataTypes.createStructField("page_id", DataTypes.LongType, true),
+ DataTypes.createStructField("action_time", DataTypes.StringType, true),
+ DataTypes.createStructField("search_keyword", DataTypes.StringType, true),
+ DataTypes.createStructField("click_category_id", DataTypes.LongType, true),
+ DataTypes.createStructField("click_product_id", DataTypes.LongType, true),
+ DataTypes.createStructField("order_category_ids", DataTypes.StringType, true),
+ DataTypes.createStructField("order_product_ids", DataTypes.StringType, true),
+ DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true),
+ DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true)));
+
+ DataFrame df = sqlContext.createDataFrame(rowsRDD, schema);
+
+ df.registerTempTable("user_visit_action");
+ for(Row _row : df.take(1)) {
+ System.out.println(_row);
+ }
+
+ /**
+ * ==================================================================
+ */
+
+ rows.clear();
+ String[] sexes = new String[]{"male", "female"};
+ for(int i = 0; i < 100; i ++) {
+ long userid = i;
+ String username = "user" + i;
+ String name = "name" + i;
+ int age = random.nextInt(60);
+ String professional = "professional" + random.nextInt(100);
+ String city = "city" + random.nextInt(100);
+ String sex = sexes[random.nextInt(2)];
+
+ Row row = RowFactory.create(userid, username, name, age,
+ professional, city, sex);
+ rows.add(row);
+ }
+
+ rowsRDD = sc.parallelize(rows);
+
+ StructType schema2 = DataTypes.createStructType(Arrays.asList(
+ DataTypes.createStructField("user_id", DataTypes.LongType, true),
+ DataTypes.createStructField("username", DataTypes.StringType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true),
+ DataTypes.createStructField("age", DataTypes.IntegerType, true),
+ DataTypes.createStructField("professional", DataTypes.StringType, true),
+ DataTypes.createStructField("city", DataTypes.StringType, true),
+ DataTypes.createStructField("sex", DataTypes.StringType, true)));
+
+ DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
+ for(Row _row : df2.take(1)) {
+ System.out.println(_row);
+ }
+
+ df2.registerTempTable("user_info");
+ }
+
+}
diff --git a/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java b/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java
new file mode 100644
index 0000000..ac54bce
--- /dev/null
+++ b/src/main/java/cn/edu/hust/session/SessionAggrStatAccumulator.java
@@ -0,0 +1,51 @@
+package cn.edu.hust.session;
+
+import cn.edu.hust.constant.Constants;
+import cn.edu.hust.util.StringUtils;
+import org.apache.spark.AccumulatorParam;
+
+public class SessionAggrStatAccumulator implements AccumulatorParam{
+ @Override
+ public String addAccumulator(String s, String t1) {
+ return add(s,t1);
+ }
+
+
+ @Override
+ public String addInPlace(String s, String r1) {
+ return add(s,r1);
+ }
+
+ //主要用于数据的初始化,这里主要返回一个值就是所有范围区间得的数量
+ @Override
+ public String zero(String s) {
+ return Constants.SESSION_COUNT + "=0|"
+ + Constants.TIME_PERIOD_1s_3s + "=0|"
+ + Constants.TIME_PERIOD_4s_6s + "=0|"
+ + Constants.TIME_PERIOD_7s_9s + "=0|"
+ + Constants.TIME_PERIOD_10s_30s + "=0|"
+ + Constants.TIME_PERIOD_30s_60s + "=0|"
+ + Constants.TIME_PERIOD_1m_3m + "=0|"
+ + Constants.TIME_PERIOD_3m_10m + "=0|"
+ + Constants.TIME_PERIOD_10m_30m + "=0|"
+ + Constants.TIME_PERIOD_30m + "=0|"
+ + Constants.STEP_PERIOD_1_3 + "=0|"
+ + Constants.STEP_PERIOD_4_6 + "=0|"
+ + Constants.STEP_PERIOD_7_9 + "=0|"
+ + Constants.STEP_PERIOD_10_30 + "=0|"
+ + Constants.STEP_PERIOD_30_60 + "=0|"
+ + Constants.STEP_PERIOD_60 + "=0";
+ }
+
+ private String add(String v1,String v2)
+ {
+ if(StringUtils.isEmpty(v1)) return v2;
+ String value=StringUtils.getFieldFromConcatString(v1,"\\|",v2);
+ if(value!=null)
+ {
+ int newValue=Integer.valueOf(value)+1;
+ StringUtils.setFieldInConcatString(v1,"\\|",v2,String.valueOf(newValue));
+ }
+ return v1;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
new file mode 100644
index 0000000..e03867b
--- /dev/null
+++ b/src/main/java/cn/edu/hust/session/UserVisitAnalyze.java
@@ -0,0 +1,352 @@
+package cn.edu.hust.session;
+
+import cn.edu.hust.conf.ConfigurationManager;
+import cn.edu.hust.constant.Constants;
+import cn.edu.hust.dao.TaskDao;
+import cn.edu.hust.dao.factory.DaoFactory;
+import cn.edu.hust.domain.Task;
+import cn.edu.hust.mockData.MockData;
+import cn.edu.hust.util.DateUtils;
+import cn.edu.hust.util.ParamUtils;
+import cn.edu.hust.util.StringUtils;
+import cn.edu.hust.util.ValidUtils;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.hive.HiveContext;
+import org.joda.time.DateTime;
+import scala.Tuple2;
+
+import java.util.Date;
+
+/**
+ * 用户可以查询的范围包含
+ * 1。用户的职业
+ * 2。用户的性别
+ * 3。用户城市
+ * 4。用户年龄
+ * 5。获取搜索词
+ * 6。获取点击品类
+ */
+public class UserVisitAnalyze {
+ public static void main(String[] args)
+ {
+ /**
+ * 构建spark上下文
+ */
+ SparkConf conf=new SparkConf().setAppName(Constants.APP_NAME_SESSION).setMaster("local[3]");
+ JavaSparkContext context=new JavaSparkContext(conf);
+ SQLContext sc=getSQLContext(context.sc());
+ //生成模拟数据
+ mock(context,sc);
+
+ //拿到相应的Dao组建
+ TaskDao dao= DaoFactory.getTaskDao();
+ //从外部传入的参数获取任务的id
+ Long taskId=ParamUtils.getTaskIdFromArgs(args);
+ //从数据库中查询出相应的task
+ Task task=dao.findTaskById(taskId);
+ JSONObject jsonObject=JSONObject.parseObject(task.getTaskParam());
+
+ //获取指定范围内的Sesssion
+ JavaRDD sessionRangeDate=getActionRDD(sc,jsonObject);
+ //按照Sesson进行聚合
+ JavaPairRDD sesssionAggregateInfoRDD=aggregateBySessionId(sc,sessionRangeDate);
+
+ //通过条件对RDD进行筛选
+ // 重构,同时统计
+ Accumulator sessionAggrStatAccumulator=context.accumulator("",new SessionAggrStatAccumulator());
+
+ JavaPairRDD filteredSessionRDD=filterSessionAndAggrStat(sesssionAggregateInfoRDD,jsonObject,sessionAggrStatAccumulator);
+
+ //session聚合统计,统计出访问时长和访问步长的各个区间所占的比例
+ /**
+ * 重构实现的思路:
+ * 1。不要去生成任何的新RDD
+ * 2。不要去单独遍历一遍sesion的数据
+ * 3。可以在聚合数据的时候可以直接计算session的访问时长和访问步长
+ * 4。在以前的聚合操作中,可以在以前的基础上进行计算加上自己实现的Accumulator来进行一次性解决
+ * 开发Spark的经验准则
+ * 1。尽量少生成RDD
+ * 2。尽量少对RDD进行蒜子操作,如果可能,尽量在一个算子里面,实现多个需求功能
+ * 3。尽量少对RDD进行shuffle算子操作,比如groupBykey、reduceBykey、sortByKey
+ * shuffle操作,会导致大量的磁盘读写,严重降低性能
+ * 有shuffle的算子,和没有shuffle的算子,甚至性能相差极大
+ * 有shuffle的算子,很容易造成性能倾斜,一旦数据倾斜,简直就是性能杀手
+ * 4。无论做什么功能,性能第一
+ * 在大数据项目中,性能最重要。主要是大数据以及大数据项目的特点,决定了大数据的程序和项目速度,都比较满
+ * 如果不考虑性能的话,就会导致一个大数据处理程序运行长达数个小时,甚至是数个小时,对用户的体验,简直是
+ * 一场灾难。
+ */
+
+ //关闭spark上下文
+ context.close();
+ }
+
+
+
+ /**
+ * 用于判断是否是生产环境
+ * @param sc
+ * @return
+ */
+ public static SQLContext getSQLContext(SparkContext sc)
+ {
+ boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
+ if(local)
+ {
+ return new SQLContext(sc);
+ }
+ return new HiveContext(sc);
+ }
+
+ private static void mock(JavaSparkContext context,SQLContext sc)
+ {
+ boolean local= ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
+ if(local)
+ {
+ MockData.mock(context,sc);
+ }
+
+ }
+
+ /**
+ * 获取指定日期范围内的数据
+ * @param sc
+ * @param taskParam
+ * @return
+ */
+ private static JavaRDD getActionRDD(SQLContext sc, JSONObject taskParam)
+ {
+ String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
+ String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
+ String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
+ DataFrame df=sc.sql(sql);
+ return df.javaRDD();
+ }
+
+ private static JavaPairRDD aggregateBySessionId(SQLContext sc, JavaRDD sessionRangeDate) {
+ /**
+ * 先将数据映射成map格式
+ */
+ JavaPairRDD sessionActionPair=sessionRangeDate.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getString(2),row);
+ }
+ });
+ /**
+ * 根据sessionId进行分组
+ */
+ JavaPairRDD> sessionActionGrouped=sessionActionPair.groupByKey();
+
+ JavaPairRDD sessionPartInfo=sessionActionGrouped.mapToPair(new PairFunction>, Long, String>() {
+ @Override
+ public Tuple2 call(Tuple2> stringIterableTuple2) throws Exception {
+ String sessionId=stringIterableTuple2._1;
+ Iterable rows=stringIterableTuple2._2;
+ StringBuffer searchKeywords=new StringBuffer();
+ StringBuffer clickCategoryIds=new StringBuffer();
+ Long userId=null;
+ Date startTime=null;
+ Date endTime=null;
+ int stepLength=0;
+ for (Row row:rows)
+ {
+ if(userId==null)
+ userId=row.getLong(1);
+ String searchKeyword=row.getString(5);
+ Long clickCategoryId=row.getLong(6);
+ //判断是否需要拼接
+ if(StringUtils.isNotEmpty(searchKeyword))
+ {
+ if(!searchKeywords.toString().contains(searchKeyword))
+ searchKeywords.append(searchKeyword+",");
+ }
+
+ if(clickCategoryId!=null)
+ {
+ if(!clickCategoryId.toString().contains(String.valueOf(clickCategoryId)))
+ clickCategoryIds.append(String.valueOf(clickCategoryId)+",");
+ }
+
+ //计算session开始时间和结束时间
+ Date actionTime= DateUtils.parseTime(row.getString(4));
+ if(startTime==null)
+ startTime=actionTime;
+ if(endTime==null)
+ endTime=null;
+ if(actionTime.before(startTime))
+ {
+ startTime=actionTime;
+ }
+ if(actionTime.after(endTime))
+ {
+ endTime=actionTime;
+ }
+ stepLength++;
+ }
+ //访问时长(s)
+ Long visitLengtth=(endTime.getTime()-startTime.getTime())/1000;
+
+ String searchKeywordsInfo=StringUtils.trimComma(searchKeywords.toString());
+ String clickCategoryIdsInfo=StringUtils.trimComma(clickCategoryIds.toString());
+ String info=Constants.FIELD_SESSIONID+"="+sessionId+"|"+Constants.FIELD_SERACH_KEYWORDS+"="+searchKeywordsInfo+"|"
+ +Constants.FIELD_CLICK_CATEGORYIDS+"="+clickCategoryIdsInfo+Constants.FIELD_VISIT_LENGTH+"="+visitLengtth+"|"
+ +Constants.FIELD_STEP_LENGTH+"="+stepLength;
+ return new Tuple2(userId,info);
+ }
+ });
+
+ //查询所有的用户数据
+ String sql="select * from user_info";
+ JavaRDD userInfoRDD=sc.sql(sql).javaRDD();
+ //将用户信息映射成map
+ JavaPairRDD userInfoPariRDD=userInfoRDD.mapToPair(new PairFunction() {
+ @Override
+ public Tuple2 call(Row row) throws Exception {
+ return new Tuple2(row.getLong(0),row);
+ }
+ });
+ //将两个信息join在一起
+ JavaPairRDD> tuple2JavaPairRDD=sessionPartInfo.join(userInfoPariRDD);
+
+ /**
+ * 拿到所需的session
+ */
+ JavaPairRDD sessionInfo=tuple2JavaPairRDD.mapToPair(new PairFunction>, String, String>() {
+ @Override
+ public Tuple2 call(Tuple2> longTuple2Tuple2) throws Exception {
+ String sessionPartInfo=longTuple2Tuple2._2._1;
+ Row userInfo=longTuple2Tuple2._2._2;
+ //拿到需要的用户信息
+ int age=userInfo.getInt(3);
+ String professional=userInfo.getString(4);
+ String city=userInfo.getString(5);
+ String sex=userInfo.getString(6);
+ //拼接字符串
+ String fullInfo=sessionPartInfo+"|"+Constants.FIELD_AGE+"="+age+"|"
+ +Constants.FIELD_PROFESSIONAL+"="+professional+"|"+Constants.FIELD_CITY+"="+city+"|"+Constants.FIELD_SEX+"="+sex;
+ String session=StringUtils.getFieldFromConcatString(sessionPartInfo,"\\|",Constants.FIELD_SESSIONID);
+ return new Tuple2(session,fullInfo);
+ }
+ });
+
+ return sessionInfo;
+ }
+
+
+ /**
+ * 根据条件进行session的筛选
+ * @param sessionInfoRDD
+ * @param taskParam
+ * @param sessionAggrStatAccumulator
+ * @return
+ */
+ private static JavaPairRDD filterSessionAndAggrStat(JavaPairRDD sessionInfoRDD, final JSONObject taskParam, final Accumulator sessionAggrStatAccumulator){
+ //得到条件
+ String startAge=ParamUtils.getParam(taskParam,Constants.PARAM_STARTAGE);
+ String endAge=ParamUtils.getParam(taskParam,Constants.PARAM_ENDAGE);
+ String professionals=ParamUtils.getParam(taskParam,Constants.PARAM_PROFESSONALS);
+ String cities=ParamUtils.getParam(taskParam,Constants.PARAM_CIYTIES);
+ String sex= ParamUtils.getParam(taskParam,Constants.PARAM_SEX);
+ String keyWords=ParamUtils.getParam(taskParam,Constants.PARAM_SERACH_KEYWORDS);
+ String categoryIds=ParamUtils.getParam(taskParam,Constants.PARAM_CLICK_CATEGORYIDS);
+
+ //拼接时间
+ String _paramter=(startAge!=null?Constants.PARAM_STARTAGE+"="+startAge+"|":"")+
+ (endAge!=null?Constants.PARAM_ENDAGE+"="+endAge+"|":"")+(professionals!=null?Constants.PARAM_PROFESSONALS+"="+professionals+"|":"")+
+ (cities!=null?Constants.PARAM_CIYTIES+"="+cities+"|":"")+(sex!=null?Constants.PARAM_SEX+"="+sex+"|":"")+
+ (keyWords!=null?Constants.PARAM_SERACH_KEYWORDS+"="+keyWords+"|":"")+(categoryIds!=null?Constants.PARAM_CLICK_CATEGORYIDS+"="+categoryIds+"|":"");
+
+
+ if(_paramter.endsWith("\\|"))
+ _paramter=_paramter.substring(0,_paramter.length()-1);
+
+ final String paramter=_paramter;
+
+ JavaPairRDD filteredSessionRDD=sessionInfoRDD.filter(new Function, Boolean>() {
+ @Override
+ public Boolean call(Tuple2 tuple2) throws Exception {
+ String sessionInfo=tuple2._2;
+ //按照条件进行过滤
+ //按照年龄进行过滤
+ if(!ValidUtils.between(sessionInfo,Constants.FIELD_AGE,paramter,Constants.PARAM_STARTAGE,Constants.PARAM_ENDAGE))
+ return false;
+ //按照职业进行过滤
+ if(!ValidUtils.in(sessionInfo,Constants.FIELD_PROFESSIONAL,paramter,Constants.PARAM_PROFESSONALS))
+ return false;
+ //按照城市进行过滤
+ if(!ValidUtils.in(sessionInfo,Constants.FIELD_CITY,paramter,Constants.PARAM_CIYTIES))
+ return false;
+ //按照性别进行筛选
+ if(!ValidUtils.equal(sessionInfo,Constants.FIELD_SEX,paramter,Constants.PARAM_SEX))
+ return false;
+ //按照搜索词进行过滤,只要有一个搜索词即可
+ if(!ValidUtils.in(sessionInfo,Constants.FIELD_SERACH_KEYWORDS,paramter,Constants.PARAM_PROFESSONALS))
+ return false;
+ if(!ValidUtils.in(sessionInfo,Constants.FIELD_CLICK_CATEGORYIDS,paramter,Constants.FIELD_CLICK_CATEGORYIDS))
+ return false;
+ //如果经过了之前的所有的过滤条件,也就是满足用户筛选条件
+ sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
+ //计算出访问时长和访问步长的范围并进行相应的累加
+ Long visitLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_VISIT_LENGTH));
+ Long stepLength=Long.valueOf(StringUtils.getFieldFromConcatString(sessionInfo,"\\|",Constants.FIELD_STEP_LENGTH));
+ //使用函数进行统计
+ calculateVisitLength(visitLength);
+ calculateStepLength(stepLength);
+ return true;
+ }
+
+ //统计访问时长的数量
+ private void calculateVisitLength(Long visitLegth)
+ {
+ if(visitLegth>=1&&visitLegth<=3)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
+ else if(visitLegth>=4&&visitLegth<=6)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
+ else if(visitLegth>=7&&visitLegth<=9)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
+ else if(visitLegth>=10&&visitLegth<=30)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
+ else if(visitLegth>30&&visitLegth<=60)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
+ else if(visitLegth>60&&visitLegth<=180)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
+ else if(visitLegth>180&&visitLegth<=600)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
+ else if(visitLegth>600&&visitLegth<=1800)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
+ else if(visitLegth>1800)
+ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
+ }
+ //统计访问步长的数量
+ private void calculateStepLength(Long stepLength)
+ {
+ if(stepLength>=1&&stepLength<=3)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
+ else if(stepLength>=4&&stepLength<=6)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
+ else if(stepLength>=7&&stepLength<=9)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
+ else if(stepLength>=10&&stepLength<=30)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
+ else if(stepLength>30&&stepLength<=60)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
+ else if(stepLength>60)
+ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
+ }
+ });
+ return filteredSessionRDD;
+ }
+}
diff --git a/src/main/java/cn/edu/hust/util/DateUtils.java b/src/main/java/cn/edu/hust/util/DateUtils.java
index 7f1292d..306f6e2 100644
--- a/src/main/java/cn/edu/hust/util/DateUtils.java
+++ b/src/main/java/cn/edu/hust/util/DateUtils.java
@@ -1,5 +1,6 @@
package cn.edu.hust.util;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
@@ -127,5 +128,15 @@ public class DateUtils {
public static String formatTime(Date date) {
return TIME_FORMAT.format(date);
}
+
+ public static Date parseTime(String time)
+ {
+ try {
+ return TIME_FORMAT.parse(time);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
}
diff --git a/src/main/java/cn/edu/hust/util/StringUtils.java b/src/main/java/cn/edu/hust/util/StringUtils.java
index 76b8104..0919d52 100644
--- a/src/main/java/cn/edu/hust/util/StringUtils.java
+++ b/src/main/java/cn/edu/hust/util/StringUtils.java
@@ -64,10 +64,12 @@ public class StringUtils {
String delimiter, String field) {
String[] fields = str.split(delimiter);
for(String concatField : fields) {
- String fieldName = concatField.split("=")[0];
- String fieldValue = concatField.split("=")[1];
- if(fieldName.equals(field)) {
- return fieldValue;
+ if(concatField.split("=").length==2) {
+ String fieldName = concatField.split("=")[0];
+ String fieldValue = concatField.split("=")[1];
+ if (fieldName.equals(field)) {
+ return fieldValue;
+ }
}
}
return null;
diff --git a/src/main/resources/conf.properties b/src/main/resources/conf.properties
index f995a3b..97b31cb 100644
--- a/src/main/resources/conf.properties
+++ b/src/main/resources/conf.properties
@@ -1,2 +1,6 @@
-key1=value1
-key2=value2
\ No newline at end of file
+jdbc.driver=com.mysql.jdbc.Driver
+jdbc.url=jdbc:mysql://10.211.55.16:3306/BigDataPlatm?useUnicode=true&&characterEncoding=UTF-8
+jdbc.username=root
+jdbc.password=root
+jdbc.active=10
+spark.local=true;
diff --git a/src/test/java/cn/edu/hust/dao/TaskDaoTest.java b/src/test/java/cn/edu/hust/dao/TaskDaoTest.java
new file mode 100644
index 0000000..8709594
--- /dev/null
+++ b/src/test/java/cn/edu/hust/dao/TaskDaoTest.java
@@ -0,0 +1,14 @@
+package cn.edu.hust.dao;
+
+import cn.edu.hust.dao.factory.DaoFactory;
+import cn.edu.hust.domain.Task;
+import org.junit.Test;
+
+public class TaskDaoTest {
+ @Test
+ public void testDao()
+ {
+ Task task=DaoFactory.getTaskDao().findTaskById(1L);
+ System.out.println(task.getTaskName()+":"+task.getTaskParam());
+ };
+}
diff --git a/src/test/java/cn/edu/hust/jdbc/JDBCHelperTest.java b/src/test/java/cn/edu/hust/jdbc/JDBCHelperTest.java
new file mode 100644
index 0000000..dbcc9ae
--- /dev/null
+++ b/src/test/java/cn/edu/hust/jdbc/JDBCHelperTest.java
@@ -0,0 +1,49 @@
+package cn.edu.hust.jdbc;
+
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JDBCHelperTest {
+ @Test
+ public void testUpdate()
+ {
+ String sql="insert into user(username,age) values(?,?)";
+ Object[] params={"zhangsan",12};
+ JDBCHelper.getInstance().excuteUpdate(sql,params);
+ }
+
+
+ @Test
+ public void testBatch()
+ {
+ String sql="insert into user(username,age) values(?,?)";
+ List