diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-05 16:16:04 +0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-05 16:16:04 +0800 |
commit | 822d9c5b70f3ad8fad1bb23075d29246ff31a38d (patch) | |
tree | bd2ab813c1f3fe0e5ddc9409a02998f126e99d15 /graph | |
parent | d40c1d51229070660cb0c2f0d8fd04954c9d8ab7 (diff) | |
download | spark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.tar.gz spark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.tar.bz2 spark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.zip |
Rename rawGraph to graph in Pregel.
Diffstat (limited to 'graph')
-rw-r--r-- | graph/src/main/scala/spark/graph/Pregel.scala | 17 | ||||
-rw-r--r-- | graph/src/test/scala/spark/graph/GraphSuite.scala | 5 |
2 files changed, 12 insertions, 10 deletions
diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala index 1011e3bf12..4bd8810634 100644 --- a/graph/src/main/scala/spark/graph/Pregel.scala +++ b/graph/src/main/scala/spark/graph/Pregel.scala @@ -6,35 +6,32 @@ import spark.RDD object Pregel { - def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]( - rawGraph: Graph[VD, ED])( + def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])( vprog: ( Vertex[VD], A) => VD, sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A], mergeMsg: (A, A) => A, initialMsg: A, numIter: Int) : Graph[VD, ED] = { - var graph = rawGraph.cache + var g = graph.cache var i = 0 def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) = sendMsg(edge.otherVertex(vid).id, edge) - var msgs: RDD[(Vid, A)] = graph.vertices.map{ v => (v.id, initialMsg) } + var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) } while (i < numIter) { - def runProg(v: Vertex[VD], msg: Option[A]): VD = - if(msg.isEmpty) v.data else vprog(v, msg.get) + def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get) - graph = graph.updateVertices(msgs, runProg).cache() + g = g.updateVertices(msgs, runProg).cache() - msgs = graph.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In) + msgs = g.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In) i += 1 } - graph - + g } diff --git a/graph/src/test/scala/spark/graph/GraphSuite.scala b/graph/src/test/scala/spark/graph/GraphSuite.scala index 11b3dd1298..64a7aa063b 100644 --- a/graph/src/test/scala/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/spark/graph/GraphSuite.scala @@ -37,5 +37,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data) assert(g.numVertexPartitions === 5) assert(g.numEdgePartitions === 8) + + g = g.reverse + assert(g.numVertexPartitions === 5) + assert(g.numEdgePartitions === 8) + } } |