aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-04-05 16:16:04 +0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-04-05 16:16:04 +0800
commit822d9c5b70f3ad8fad1bb23075d29246ff31a38d (patch)
treebd2ab813c1f3fe0e5ddc9409a02998f126e99d15 /graph
parentd40c1d51229070660cb0c2f0d8fd04954c9d8ab7 (diff)
downloadspark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.tar.gz
spark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.tar.bz2
spark-822d9c5b70f3ad8fad1bb23075d29246ff31a38d.zip
Rename rawGraph to graph in Pregel.
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/Pregel.scala17
-rw-r--r--graph/src/test/scala/spark/graph/GraphSuite.scala5
2 files changed, 12 insertions, 10 deletions
diff --git a/graph/src/main/scala/spark/graph/Pregel.scala b/graph/src/main/scala/spark/graph/Pregel.scala
index 1011e3bf12..4bd8810634 100644
--- a/graph/src/main/scala/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/spark/graph/Pregel.scala
@@ -6,35 +6,32 @@ import spark.RDD
object Pregel {
- def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
- rawGraph: Graph[VD, ED])(
+ def iterate[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](graph: Graph[VD, ED])(
vprog: ( Vertex[VD], A) => VD,
sendMsg: (Vid, EdgeWithVertices[VD, ED]) => Option[A],
mergeMsg: (A, A) => A,
initialMsg: A,
numIter: Int) : Graph[VD, ED] = {
- var graph = rawGraph.cache
+ var g = graph.cache
var i = 0
def reverseGather(vid: Vid, edge: EdgeWithVertices[VD,ED]) =
sendMsg(edge.otherVertex(vid).id, edge)
- var msgs: RDD[(Vid, A)] = graph.vertices.map{ v => (v.id, initialMsg) }
+ var msgs: RDD[(Vid, A)] = g.vertices.map{ v => (v.id, initialMsg) }
while (i < numIter) {
- def runProg(v: Vertex[VD], msg: Option[A]): VD =
- if(msg.isEmpty) v.data else vprog(v, msg.get)
+ def runProg(v: Vertex[VD], msg: Option[A]): VD = if(msg.isEmpty) v.data else vprog(v, msg.get)
- graph = graph.updateVertices(msgs, runProg).cache()
+ g = g.updateVertices(msgs, runProg).cache()
- msgs = graph.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In)
+ msgs = g.flatMapReduceNeighborhood(reverseGather, mergeMsg, EdgeDirection.In)
i += 1
}
- graph
-
+ g
}
diff --git a/graph/src/test/scala/spark/graph/GraphSuite.scala b/graph/src/test/scala/spark/graph/GraphSuite.scala
index 11b3dd1298..64a7aa063b 100644
--- a/graph/src/test/scala/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/spark/graph/GraphSuite.scala
@@ -37,5 +37,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
(v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
assert(g.numVertexPartitions === 5)
assert(g.numEdgePartitions === 8)
+
+ g = g.reverse
+ assert(g.numVertexPartitions === 5)
+ assert(g.numEdgePartitions === 8)
+
}
}