|
|
|
@ -0,0 +1,204 @@
|
|
|
|
|
## 4.2 Spark GraphX计算生成最短路径
|
|
|
|
|
### 4.2.1 pregel函数的使用
|
|
|
|
|
上一章节中,我们学习了如何使用`Graphx`定义图结构。这一章节,我们来学习如何使用`Graphx`计算生成最短路径。在编写代码之前,首先我们要学习计算生成最短路径的相关知识。
|
|
|
|
|
|
|
|
|
|
在学习接下来的知识之前,我们首先要了解有关顶点的相关知识:
|
|
|
|
|
|
|
|
|
|
顶点的状态有两种:
|
|
|
|
|
- 钝化态【类似于休眠,不做任何事】;
|
|
|
|
|
- 激活态【干活】。
|
|
|
|
|
|
|
|
|
|
顶点能够处于激活态需要有条件:
|
|
|
|
|
- 成功收到消息或者成功发送了任何一条消息。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**`pregel`函数参数说明**
|
|
|
|
|
|
|
|
|
|
`Pregel`是个强大的基于图的迭代算法,也是`Spark中`的一个迭代应用`aggregateMessage`的典型案例,用它可以在图中方便的迭代计算,如最短路径、关键路径、`n`度关系等。然而对于之前对图计算接触不多的童鞋来说,这个`api`还算是一个比较重量组的接口,不太容易理解。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
`pregel`函数源码:
|
|
|
|
|
```
|
|
|
|
|
def pregel[A: ClassTag](
|
|
|
|
|
initialMsg: A,
|
|
|
|
|
maxIterations: Int = Int.MaxValue,
|
|
|
|
|
activeDirection: EdgeDirection = EdgeDirection.Either)(
|
|
|
|
|
vprog: (VertexId, VD, A) => VD,
|
|
|
|
|
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
|
|
|
|
|
mergeMsg: (A, A) => A)
|
|
|
|
|
: Graph[VD, ED] = {
|
|
|
|
|
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数说明:
|
|
|
|
|
|
|
|
|
|
| 参数 | 说明 |
|
|
|
|
|
| ---- | ---- |
|
|
|
|
|
| initialMsg | 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息 |
|
|
|
|
|
| maxIterations | 最大迭代次数 |
|
|
|
|
|
| activeDirection | 规定了发送消息的方向 |
|
|
|
|
|
| vprog | 节点调用该消息将聚合后的数据和本节点进行属性的合并 |
|
|
|
|
|
| sendMsg | 激活态的节点调用该方法发送消息 |
|
|
|
|
|
| mergeMsg | 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**pregel原理分析**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
了解`pregel`的参数后,接下来我们对其原理进行探索吧!
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**Pregel算法的大体过程(最短路径计算)**
|
|
|
|
|
|
|
|
|
|
1.调用`pregel`方法之前,首先初始化所有点的值(代表这个点到源点的最短距离),一般源点设置距离为`0`,其他顶点都设为正无穷大`Double.PositiveInfinity`;
|
|
|
|
|
|
|
|
|
|
2.接着源点被激活,发送消息(一般消息为自身的值)给相邻的点;
|
|
|
|
|
|
|
|
|
|
3.相邻点接收到信息,将消息中的值与自身的值做比较,(一般是比较消息值与边的和 和 自身的值谁更小),决定是否更改自身的值,每个更改过值的点将被激活;
|
|
|
|
|
|
|
|
|
|
4.被激活的向自身周围的点发送消息;
|
|
|
|
|
|
|
|
|
|
5.重复`3`和`4`步,直到迭代到达最大次数或没有存在活跃的点为止。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 4.2.2 使用 GraphX 计算生成最短路径
|
|
|
|
|
之前我们已经使用`GraphX`定义了图结构。假设我们的总站点为`1`号站点,现想从总站点派出`7`辆补给车运送军用物资给其它补给站点,需要我们对其道路进行规划,计算如何从起始点到目的地以最短距离运送我们的军用物资。(参照上一章节图结构)
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
示例代码如下:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
//起始顶点id
|
|
|
|
|
val srcVertexId = 1L
|
|
|
|
|
// 调用`pregel`方法之前,首先初始化所有点的值。
|
|
|
|
|
val initialGraph = graph.mapVertices { case (id, (name)) => if (id == srcVertexId) (new ArrayBuffer[Long](),0.0) else (new ArrayBuffer[Long](),Double.PositiveInfinity) }
|
|
|
|
|
|
|
|
|
|
initialGraph.vertices.foreach(println) // 后面操作中删除
|
|
|
|
|
// ArrayBuffer 中计算起始点到目标点的行走线路,Infinity 计算最短距离
|
|
|
|
|
输出:
|
|
|
|
|
(2,(ArrayBuffer(),Infinity))
|
|
|
|
|
(6,(ArrayBuffer(),Infinity))
|
|
|
|
|
(4,(ArrayBuffer(),Infinity))
|
|
|
|
|
(7,(ArrayBuffer(),Infinity))
|
|
|
|
|
(3,(ArrayBuffer(),Infinity))
|
|
|
|
|
(8,(ArrayBuffer(),Infinity))
|
|
|
|
|
(1,(ArrayBuffer(),0.0))
|
|
|
|
|
|
|
|
|
|
initialGraph.pregel(
|
|
|
|
|
(new ArrayBuffer[Long](),Double.PositiveInfinity), // 定义图初始化时,开始模型计算的时候,所有节点都会先收到的消息
|
|
|
|
|
Int.MaxValue, // 最大迭代次数
|
|
|
|
|
EdgeDirection.Out // 发送消息的方向。在下一轮迭代时,只有接收消息的出边(src—>dst)才会执行sendMsg函数
|
|
|
|
|
)((VertexId, VD, a) =>{
|
|
|
|
|
// 节点调用该消息将聚合后的数据和本节点进行属性的合并
|
|
|
|
|
var array = new ArrayBuffer[Long]()
|
|
|
|
|
// 如果为第一次调用则将顶点添加到array中,否则将聚合后的数据添加到array中
|
|
|
|
|
if (a._1.isEmpty){
|
|
|
|
|
array += VertexId
|
|
|
|
|
}else{
|
|
|
|
|
array ++= a._1
|
|
|
|
|
}
|
|
|
|
|
// 发送合并后的数据
|
|
|
|
|
math.min(VD._2,a._2) match {
|
|
|
|
|
case VD._2 => (array,VD._2)
|
|
|
|
|
case a._2 => (array,a._2)
|
|
|
|
|
}
|
|
|
|
|
},(edgeTriplet: EdgeTriplet[(ArrayBuffer[Long],Double),Double]) => {
|
|
|
|
|
// 激活态的节点调用该方法发送消息
|
|
|
|
|
// 将上一个节点和当前节点的属性添加到array中。可以理解将迭代的节点都添加到array中
|
|
|
|
|
var array = new ArrayBuffer[Long]()
|
|
|
|
|
println(edgeTriplet.srcAttr._1 +"|"+edgeTriplet.dstAttr._1)
|
|
|
|
|
array ++= edgeTriplet.srcAttr._1
|
|
|
|
|
array ++= edgeTriplet.dstAttr._1
|
|
|
|
|
// 相邻点接收到信息,将消息中的值与自身的值做比较,决定是否更改自身的值,每个更改过值的点将被激活
|
|
|
|
|
if(edgeTriplet.srcAttr._2 + edgeTriplet.attr < edgeTriplet.dstAttr._2) {
|
|
|
|
|
Iterator((edgeTriplet.dstId, (array,edgeTriplet.srcAttr._2 + edgeTriplet.attr )))
|
|
|
|
|
}else{
|
|
|
|
|
// 空迭代器
|
|
|
|
|
Iterator.empty
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
,(x,y) =>{
|
|
|
|
|
// 如果一个节点接收到多条消息,先用mergeMsg来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数
|
|
|
|
|
// 这里我计算的最小值
|
|
|
|
|
math.min(x._2,y._2) match {
|
|
|
|
|
case x._2 => x
|
|
|
|
|
case y._2 => y
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// 使用过滤器过滤掉顶点1,源点1给1运输没意义
|
|
|
|
|
).vertices.filter(_._1!= 1L).foreach(x =>{
|
|
|
|
|
// 遍历数据集
|
|
|
|
|
val value = x._2._1
|
|
|
|
|
println(s"从总补给点“${value(0)}”前往目标补给点“${value(value.size-1)}”配送物资的最近距离为“${x._2._2.formatted("%.1f")}”km,其行走线路为:${value.mkString("->")}")
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
输出:
|
|
|
|
|
|
|
|
|
|
从总补给点“1”前往目标补给点“7”配送物资的最近距离为“7.7”km,其行走线路为:1->2->3->7
|
|
|
|
|
从总补给点“1”前往目标补给点“3”配送物资的最近距离为“4.3”km,其行走线路为:1->2->3
|
|
|
|
|
从总补给点“1”前往目标补给点“4”配送物资的最近距离为“4.5”km,其行走线路为:1->5->4
|
|
|
|
|
从总补给点“1”前往目标补给点“2”配送物资的最近距离为“2.1”km,其行走线路为:1->2
|
|
|
|
|
从总补给点“1”前往目标补给点“5”配送物资的最近距离为“3.4”km,其行走线路为:1->5
|
|
|
|
|
从总补给点“1”前往目标补给点“8”配送物资的最近距离为“6.5”km,其行走线路为:1->8
|
|
|
|
|
从总补给点“1”前往目标补给点“6”配送物资的最近距离为“5.4”km,其行走线路为:1->2->6
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 4.2.3 使用 Python GraphFrames定义了图结构
|
|
|
|
|
由于`GraphX`基于`RDD API`,不支持`Python API`,那么我们想用`Python`做图计算的时候该怎么办呢?这时候,我们可以使用 `GraphFrames`定义图结构。该类库是构建在`DataFrame`之上,它既能利用`DataFrame`良好的扩展性和强大的性能,同时也为`Scala`、`Java`和`Python提`供了统一的图处理`API`。
|
|
|
|
|
|
|
|
|
|
示例代码如下:
|
|
|
|
|
```
|
|
|
|
|
from pyspark.context import SparkContext
|
|
|
|
|
from pyspark.sql import SQLContext
|
|
|
|
|
from pyspark.context import SparkConf
|
|
|
|
|
from graphframes import *
|
|
|
|
|
|
|
|
|
|
conf=SparkConf().setAppName("test1").setMaster("local[*]")
|
|
|
|
|
sc=SparkContext(conf=conf)
|
|
|
|
|
sqlContext = SQLContext(sc)
|
|
|
|
|
# Vertex DataFrame
|
|
|
|
|
v = sqlContext.createDataFrame([
|
|
|
|
|
(1, "南马补给站"),
|
|
|
|
|
(2, "多贝尔补给站"),
|
|
|
|
|
(3, "安其补给站"),
|
|
|
|
|
(4, "雪山补给站"),
|
|
|
|
|
(5, "终极火力补给站"),
|
|
|
|
|
(6, "末日补给站"),
|
|
|
|
|
(7, "英迪补给站"),
|
|
|
|
|
(8, "远洋补给站")
|
|
|
|
|
], ["id", "name"])
|
|
|
|
|
|
|
|
|
|
# Edge DataFrame
|
|
|
|
|
e = sqlContext.createDataFrame([
|
|
|
|
|
(1,2,2.1),
|
|
|
|
|
(2,3,2.2),
|
|
|
|
|
(3,4,4.4),
|
|
|
|
|
(1,5,3.4),
|
|
|
|
|
(5,4,1.1),
|
|
|
|
|
(4,8,2.3),
|
|
|
|
|
(1,8,16.5),
|
|
|
|
|
(3,7,3.4),
|
|
|
|
|
(7,5,8.5),
|
|
|
|
|
(2,6,3.3),
|
|
|
|
|
(6,7,3.2),
|
|
|
|
|
(7,8,2.2)
|
|
|
|
|
|
|
|
|
|
], ["src","dst","distance"])
|
|
|
|
|
|
|
|
|
|
# 创建 GraphFrame
|
|
|
|
|
g = GraphFrame(v, e)
|
|
|
|
|
# 广度优先搜索(BFS)查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。将起点和终点指定为Spark DataFrame表达式。
|
|
|
|
|
g.bfs("id = 1","id = 8").show()
|
|
|
|
|
|
|
|
|
|
输出:
|
|
|
|
|
+----------+------------+----------+
|
|
|
|
|
| from| e0| to|
|
|
|
|
|
+----------+------------+----------+
|
|
|
|
|
|[1, 南马补给站]|[1, 8, 16.5]|[8, 远洋补给站]|
|
|
|
|
|
+----------+------------+----------+
|
|
|
|
|
```
|