diff --git a/SUMMARY.md b/SUMMARY.md index d17e204..74c46c2 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -17,7 +17,8 @@ * [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md) * [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对军事类数据的查询次数进行可视化展示.md) * [第四章 Spark图数据计算与分析实战](/chapter4/4Spark图数据计算简介.md) - * [4.1 SparkGraphX计算生成最短路径](/chapter4/4.1SparkGraphX计算生成最短路径.md) + * [4.1 SparkGraphX定义图结构](/chapter4/4.1SparkGraphX定义图结构.md) + * [4.2 SparkGraphX计算生成最短路径](/chapter4/4.2SparkGraphX计算生成最短路径.md) * [第五章 Spark机器学习实战](/chapter5/5Spark机器学习实战简介.md) * [5.1 Spark机器学习入门](/chapter5/5.1Spark机器学习入门.md) * [5.2 Spark机器学习实战](/chapter5/5.2Spark机器学习实战.md) diff --git a/chapter4/4.1SparkGraphX定义图结构.md b/chapter4/4.1SparkGraphX定义图结构.md new file mode 100644 index 0000000..c8b9325 --- /dev/null +++ b/chapter4/4.1SparkGraphX定义图结构.md @@ -0,0 +1,165 @@ +## 4.1 Spark GraphX 定义图结构 + +### 4.1.1 图的基本概念 +图是由顶点集合`(vertex)`及顶点间的关系集合`(边edge)`组成的一种数据结构。可以对事物以及事物之间的关系建模,也可以用来表示自然发生的连接数据,如:社交网络、互联网`web`页面。 + +常用的应用有:在地图应用中找到最短路径、基于与他人的相似度图,推荐产品、服务、人际关系或媒体。 +### 4.1.2 术语 + +**顶点和边** + +在一般的关系图中,事物为顶点,关系为边。 + +**有向图和无向图** + +在有向图中,一条边的两个顶点一般扮演者不同的角色,比如父子关系、页面`A`连接向页面`B`; + +在一个无向图中,边没有方向,即关系都是对等的,比如`qq`中的好友。 + +> `GraphX`中有一个重要概念,所有的边都有一个方向,那么图就是有向图,如果忽略边的方向,就是无向图。 + +**有环图和无环图** +有环图是包含循环的,一系列顶点构成一个环。无环图没有环。如果不关心终止条件,算法可能永远在环上执行,无法退出。 + +![avatar](/image/环图.png) + +**其他常用术语** + +- 度:表示一个顶点的所有边的数量; +- 出边:指从当前顶点指向其他顶点的边; +- 入边:表示其他顶点指向当前顶点的边; +- 出度:一个顶点出边的数量; +- 入度:一个顶点入边的数量; +- 超步:图进行迭代计算时,每一轮的迭代叫做一个超步。 + + +### 4.1.3 Spark Graphx 定义图结构 +假设我们要使用`Graphx`项目构造一个由军事补给站组成的属性图。顶点属性包含`ID`和补给站名称,各顶点间的关系由线路构成。生成其属性图关系如下: + +![avatar](/image/补给图.png) + +顶点属性数据: +``` +id name +1L, "南马补给站" +2L, "多贝尔补给站" +3L, "安其补给站" +4L, "雪山补给站" +5L, "终极火力补给站" +6L, "末日补给站" +7L, "英迪补给站" +8L, "远洋补给站" +``` + +各顶点间的关系数据: +``` +id1 id2 distance +1L, 2L, 2.1 +2L, 3L, 2.2 +3L, 4L, 4.4 +1L, 5L, 3.4 +5L, 4L, 1.1 +4L, 8L, 2.3 +1L, 8L, 6.5 +3L, 7L, 3.4 +7L, 5L, 8.5 +2L, 6L,3.3 +6L, 7L, 3.2 +7L, 8L,2.2 +``` +> 感兴趣的小伙伴可以将数据存储到本地文本文件中读取! + +有了顶点属性与顶点关系数据后,接下来我们就可以使用`Graphx`构建图结构了。示例代码如下: + +``` +// 设置运行环境 + val sparkConf = new SparkConf().setAppName("GraphxDemo").set("spark.master", "local[*]") +val sc = new SparkContext(sparkConf) + +// 设置顶点和边,价格顶点和边构造为RDD +// 顶点的数据类型是VD:String +val vertices: RDD[(VertexId, String)] = sc.parallelize(List( + (1L, "南马补给站"), + (2L, "多贝尔补给站"), + (3L, "安其补给站"), + (4L, "雪山补给站"), + (5L, "终极火力补给站"), + (6L, "末日补给站"), + (7L, "英迪补给站"), + (8L, "远洋补给站") + )) +// 边的数据类型 ED:Double +val edges: RDD[Edge[Double]] = sc.parallelize(List( + Edge(1L, 2L, 2.1), + Edge(2L, 3L, 2.2), + Edge(3L, 4L, 4.4), + Edge(1L, 5L, 3.4), + Edge(5L, 4L, 1.1), + Edge(4L, 8L, 2.3), + Edge(1L, 8L, 6.5), + Edge(3L, 7L, 3.4), + Edge(7L, 5L, 8.5), + Edge(2L, 6L,3.3), + Edge(6L, 7L, 3.2), + Edge(7L, 8L,2.2) + )) +// 构造图 Graph[VD,ED] +val graph = Graph(vertices, edges) +``` +属性演示: +``` +// 打印顶点属性 +graph.vertices.foreach(println) + +输出: +(1,南马补给站) +(5,终极火力补给站) +(7,英迪补给站) +.... + +// 打印顶点关系 +graph.edges.foreach(println) + +输出: +Edge(3,7,3.4) +Edge(1,5,3.4) +Edge(7,5,8.5) +Edge(2,6,3.3) +... +// 使用三元组视图来呈现顶点与目标顶点之间关系 + +graph.triplets.foreach(println) + +输出: +((5,终极火力补给站),(4,雪山补给站),1.1) +((2,多贝尔补给站),(3,安其补给站),2.2) +((3,安其补给站),(4,雪山补给站),4.4) +... + +// 出度、入度、度数 + +//定义一个求最大值的函数(比较度数),VertexId:顶点,Int:度数 +def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={ + if(a._2>b._2)a else b +} + +println("最大的出度:"+graph.outDegrees.reduce(max)) +println("最大的入度:"+graph.inDegrees.reduce(max)) +println("最大的度数:"+graph.degrees.reduce(max)) + +输出: +最大的出度:(1,3) +最大的入度:(8,3) +最大的度数:(7,4) + +// 边的转换操作 +// 实现对各顶点间的的路径加10(也可以对顶点操作) +graph.mapEdges(_.attr+10).edges.foreach(println) + +输出: +Edge(4,8,12.3) +Edge(1,8,16.5) +Edge(1,2,12.1) +... + +``` \ No newline at end of file diff --git a/chapter4/4.1SparkGraphX计算生成最短路径.md b/chapter4/4.1SparkGraphX计算生成最短路径.md deleted file mode 100644 index e69de29..0000000 diff --git a/chapter4/4.2SparkGraphX计算生成最短路径.md b/chapter4/4.2SparkGraphX计算生成最短路径.md new file mode 100644 index 0000000..5e5bb0c --- /dev/null +++ b/chapter4/4.2SparkGraphX计算生成最短路径.md @@ -0,0 +1,204 @@ +## 4.2 Spark GraphX计算生成最短路径 +### 4.2.1 pregel函数的使用 +上一章节中,我们学习了如何使用`Graphx`定义图结构。这一章节,我们来学习如何使用`Graphx`计算生成最短路径。在编写代码之前,首先我们要学习计算生成最短路径的相关知识。 + +在学习接下来的知识之前,我们首先要了解有关顶点的相关知识: + +顶点的状态有两种: +- 钝化态【类似于休眠,不做任何事】; +- 激活态【干活】。 + +顶点能够处于激活态需要有条件: +- 成功收到消息或者成功发送了任何一条消息。 + + +**`pregel`函数参数说明** + +`Pregel`是个强大的基于图的迭代算法,也是`Spark中`的一个迭代应用`aggregateMessage`的典型案例,用它可以在图中方便的迭代计算,如最短路径、关键路径、`n`度关系等。然而对于之前对图计算接触不多的童鞋来说,这个`api`还算是一个比较重量组的接口,不太容易理解。 + + +`pregel`函数源码: +``` + def pregel[A: ClassTag]( + initialMsg: A, + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Either)( + vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) + } +``` + + +参数说明: + +| 参数 | 说明 | +| ---- | ---- | +| initialMsg | 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息 | +| maxIterations | 最大迭代次数 | +| activeDirection | 规定了发送消息的方向 | +| vprog | 节点调用该消息将聚合后的数据和本节点进行属性的合并 | +| sendMsg | 激活态的节点调用该方法发送消息 | +| mergeMsg | 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数 | + + +**pregel原理分析** + + +了解`pregel`的参数后,接下来我们对其原理进行探索吧! + + +**Pregel算法的大体过程(最短路径计算)** + +1.调用`pregel`方法之前,首先初始化所有点的值(代表这个点到源点的最短距离),一般源点设置距离为`0`,其他顶点都设为正无穷大`Double.PositiveInfinity`; + +2.接着源点被激活,发送消息(一般消息为自身的值)给相邻的点; + +3.相邻点接收到信息,将消息中的值与自身的值做比较,(一般是比较消息值与边的和 和 自身的值谁更小),决定是否更改自身的值,每个更改过值的点将被激活; + +4.被激活的向自身周围的点发送消息; + +5.重复`3`和`4`步,直到迭代到达最大次数或没有存在活跃的点为止。 + + + +## 4.2.2 使用 GraphX 计算生成最短路径 +之前我们已经使用`GraphX`定义了图结构。假设我们的总站点为`1`号站点,现想从总站点派出`7`辆补给车运送军用物资给其它补给站点,需要我们对其道路进行规划,计算如何从起始点到目的地以最短距离运送我们的军用物资。(参照上一章节图结构) + +![avatar](/image/补给图.png) + +示例代码如下: + +``` +//起始顶点id +val srcVertexId = 1L +// 调用`pregel`方法之前,首先初始化所有点的值。 +val initialGraph = graph.mapVertices { case (id, (name)) => if (id == srcVertexId) (new ArrayBuffer[Long](),0.0) else (new ArrayBuffer[Long](),Double.PositiveInfinity) } + +initialGraph.vertices.foreach(println) // 后面操作中删除 +// ArrayBuffer 中计算起始点到目标点的行走线路,Infinity 计算最短距离 +输出: +(2,(ArrayBuffer(),Infinity)) +(6,(ArrayBuffer(),Infinity)) +(4,(ArrayBuffer(),Infinity)) +(7,(ArrayBuffer(),Infinity)) +(3,(ArrayBuffer(),Infinity)) +(8,(ArrayBuffer(),Infinity)) +(1,(ArrayBuffer(),0.0)) + +initialGraph.pregel( + (new ArrayBuffer[Long](),Double.PositiveInfinity), // 定义图初始化时,开始模型计算的时候,所有节点都会先收到的消息 + Int.MaxValue, // 最大迭代次数 + EdgeDirection.Out // 发送消息的方向。在下一轮迭代时,只有接收消息的出边(src—>dst)才会执行sendMsg函数 + )((VertexId, VD, a) =>{ + // 节点调用该消息将聚合后的数据和本节点进行属性的合并 + var array = new ArrayBuffer[Long]() + // 如果为第一次调用则将顶点添加到array中,否则将聚合后的数据添加到array中 + if (a._1.isEmpty){ + array += VertexId + }else{ + array ++= a._1 + } + // 发送合并后的数据 + math.min(VD._2,a._2) match { + case VD._2 => (array,VD._2) + case a._2 => (array,a._2) + } + },(edgeTriplet: EdgeTriplet[(ArrayBuffer[Long],Double),Double]) => { + // 激活态的节点调用该方法发送消息 + // 将上一个节点和当前节点的属性添加到array中。可以理解将迭代的节点都添加到array中 + var array = new ArrayBuffer[Long]() + println(edgeTriplet.srcAttr._1 +"|"+edgeTriplet.dstAttr._1) + array ++= edgeTriplet.srcAttr._1 + array ++= edgeTriplet.dstAttr._1 + // 相邻点接收到信息,将消息中的值与自身的值做比较,决定是否更改自身的值,每个更改过值的点将被激活 + if(edgeTriplet.srcAttr._2 + edgeTriplet.attr < edgeTriplet.dstAttr._2) { + Iterator((edgeTriplet.dstId, (array,edgeTriplet.srcAttr._2 + edgeTriplet.attr ))) + }else{ + // 空迭代器 + Iterator.empty + } + } + ,(x,y) =>{ + // 如果一个节点接收到多条消息,先用mergeMsg来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数 + // 这里我计算的最小值 + math.min(x._2,y._2) match { + case x._2 => x + case y._2 => y + } + } + // 使用过滤器过滤掉顶点1,源点1给1运输没意义 + ).vertices.filter(_._1!= 1L).foreach(x =>{ + // 遍历数据集 + val value = x._2._1 + println(s"从总补给点“${value(0)}”前往目标补给点“${value(value.size-1)}”配送物资的最近距离为“${x._2._2.formatted("%.1f")}”km,其行走线路为:${value.mkString("->")}") + }) + +输出: + +从总补给点“1”前往目标补给点“7”配送物资的最近距离为“7.7”km,其行走线路为:1->2->3->7 +从总补给点“1”前往目标补给点“3”配送物资的最近距离为“4.3”km,其行走线路为:1->2->3 +从总补给点“1”前往目标补给点“4”配送物资的最近距离为“4.5”km,其行走线路为:1->5->4 +从总补给点“1”前往目标补给点“2”配送物资的最近距离为“2.1”km,其行走线路为:1->2 +从总补给点“1”前往目标补给点“5”配送物资的最近距离为“3.4”km,其行走线路为:1->5 +从总补给点“1”前往目标补给点“8”配送物资的最近距离为“6.5”km,其行走线路为:1->8 +从总补给点“1”前往目标补给点“6”配送物资的最近距离为“5.4”km,其行走线路为:1->2->6 + +``` + +## 4.2.3 使用 Python GraphFrames定义了图结构 +由于`GraphX`基于`RDD API`,不支持`Python API`,那么我们想用`Python`做图计算的时候该怎么办呢?这时候,我们可以使用 `GraphFrames`定义图结构。该类库是构建在`DataFrame`之上,它既能利用`DataFrame`良好的扩展性和强大的性能,同时也为`Scala`、`Java`和`Python提`供了统一的图处理`API`。 + +示例代码如下: +``` +from pyspark.context import SparkContext +from pyspark.sql import SQLContext +from pyspark.context import SparkConf +from graphframes import * + +conf=SparkConf().setAppName("test1").setMaster("local[*]") +sc=SparkContext(conf=conf) +sqlContext = SQLContext(sc) +# Vertex DataFrame +v = sqlContext.createDataFrame([ + (1, "南马补给站"), + (2, "多贝尔补给站"), + (3, "安其补给站"), + (4, "雪山补给站"), + (5, "终极火力补给站"), + (6, "末日补给站"), + (7, "英迪补给站"), + (8, "远洋补给站") +], ["id", "name"]) + +# Edge DataFrame +e = sqlContext.createDataFrame([ + (1,2,2.1), + (2,3,2.2), + (3,4,4.4), + (1,5,3.4), + (5,4,1.1), + (4,8,2.3), + (1,8,16.5), + (3,7,3.4), + (7,5,8.5), + (2,6,3.3), + (6,7,3.2), + (7,8,2.2) + +], ["src","dst","distance"]) + +# 创建 GraphFrame +g = GraphFrame(v, e) +# 广度优先搜索(BFS)查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。将起点和终点指定为Spark DataFrame表达式。 +g.bfs("id = 1","id = 8").show() + +输出: ++----------+------------+----------+ +| from| e0| to| ++----------+------------+----------+ +|[1, 南马补给站]|[1, 8, 16.5]|[8, 远洋补给站]| ++----------+------------+----------+ +``` \ No newline at end of file diff --git a/chapter4/4Spark图数据计算简介.md b/chapter4/4Spark图数据计算简介.md index e69de29..def8736 100644 --- a/chapter4/4Spark图数据计算简介.md +++ b/chapter4/4Spark图数据计算简介.md @@ -0,0 +1,7 @@ +# 第四章:Spark图数据计算简介 +`GraphX`是`Spark`的分布式图处理框架。`GraphX`通过引入属性图:顶点和边均有属性的有向多重图,来扩充`Spark`的`RDD`。为了支持这种图计算,`GraphX`开发了一组基础功能操作。目前仍在不断扩充图算法,用来简化图计算的分析任务。 + +![avatar](/image/GraphX.png) + + +本章节主要介绍`GraphX`的核心抽象模型—属性图,并通过实例介绍如何构造一个图、如何使用图计算计算最短路径。 diff --git a/image/Graphx.png b/image/Graphx.png new file mode 100644 index 0000000..4a30f77 Binary files /dev/null and b/image/Graphx.png differ diff --git a/image/环图.png b/image/环图.png new file mode 100644 index 0000000..47a04fd Binary files /dev/null and b/image/环图.png differ diff --git a/image/补给图.png b/image/补给图.png new file mode 100644 index 0000000..fb03937 Binary files /dev/null and b/image/补给图.png differ