|
|
|
@ -110,9 +110,12 @@ initialGraph.pregel(
|
|
|
|
|
// 激活态的节点调用该方法发送消息
|
|
|
|
|
// 将上一个节点和当前节点的属性添加到array中。可以理解将迭代的节点都添加到array中
|
|
|
|
|
var array = new ArrayBuffer[Long]()
|
|
|
|
|
println(edgeTriplet.srcAttr._1 +"|"+edgeTriplet.dstAttr._1)
|
|
|
|
|
array ++= edgeTriplet.srcAttr._1
|
|
|
|
|
array ++= edgeTriplet.dstAttr._1
|
|
|
|
|
val dstarr = edgeTriplet.dstAttr._1
|
|
|
|
|
if (dstarr.length>1)
|
|
|
|
|
array +=dstarr(dstarr.length-1)
|
|
|
|
|
else
|
|
|
|
|
array ++= edgeTriplet.dstAttr._1
|
|
|
|
|
// 相邻点接收到信息,将消息中的值与自身的值做比较,决定是否更改自身的值,每个更改过值的点将被激活
|
|
|
|
|
if(edgeTriplet.srcAttr._2 + edgeTriplet.attr < edgeTriplet.dstAttr._2) {
|
|
|
|
|
Iterator((edgeTriplet.dstId, (array,edgeTriplet.srcAttr._2 + edgeTriplet.attr )))
|
|
|
|
|