aboutsummaryrefslogtreecommitdiff
path: root/graph/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-05 23:25:53 -0800
committerReynold Xin <rxin@apache.org>2013-12-05 23:25:53 -0800
commit15168d6c4dab8f5debf6d3935035fbdb923b157b (patch)
tree688c4911deed46d47b84138f39d71132d64459d4 /graph/src/test
parenta6075ba11f000195cfbb3d0ca8af1e9d464f461e (diff)
downloadspark-15168d6c4dab8f5debf6d3935035fbdb923b157b.tar.gz
spark-15168d6c4dab8f5debf6d3935035fbdb923b157b.tar.bz2
spark-15168d6c4dab8f5debf6d3935035fbdb923b157b.zip
Fixed a bug in VTableReplicated that we are always broadcasting all the vertices.
Diffstat (limited to 'graph/src/test')
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala14
1 files changed, 10 insertions, 4 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index fa4ebf3c88..514d20b76c 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -159,14 +159,20 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v1").cache()
// Modify only vertices whose vids are even
- val newVerts = star.vertices.mapValues((vid, attr) => if (vid % 2 == 0) "v2" else attr)
- val changedVerts = star.vertices.diff(newVerts)
+ val changedVerts = star.vertices.filter(_._1 % 2 == 0).mapValues((vid, attr) => "v2")
// Apply the modification to the graph
- val changedStar = star.deltaJoinVertices(newVerts, changedVerts)
+ val changedStar = star.deltaJoinVertices(changedVerts)
+
+ val newVertices = star.vertices.leftZipJoin(changedVerts) { (vid, oldVd, newVdOpt) =>
+ newVdOpt match {
+ case Some(newVd) => newVd
+ case None => oldVd
+ }
+ }
// The graph's vertices should be correct
- assert(changedStar.vertices.collect().toSet === newVerts.collect().toSet)
+ assert(changedStar.vertices.collect().toSet === newVertices.collect().toSet)
// Send the leaf attributes to the center
val sums = changedStar.mapReduceTriplets(