aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-05-26 16:10:22 -0700
committerReynold Xin <rxin@apache.org>2014-05-26 16:10:22 -0700
commit56c771cb2d00a5843c391ae6561536ee46e535d4 (patch)
treea6cf101ec4e203ec6b92f7217b1195b5d8660699 /graphx/src/test
parentcb7fe5034826844f1b50fbe8b92646317b66f21c (diff)
downloadspark-56c771cb2d00a5843c391ae6561536ee46e535d4.tar.gz
spark-56c771cb2d00a5843c391ae6561536ee46e535d4.tar.bz2
spark-56c771cb2d00a5843c391ae6561536ee46e535d4.zip
[SPARK-1931] Reconstruct routing tables in Graph.partitionBy
905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where, after repartitioning the edges, it reuses the VertexRDD without updating the routing tables to reflect the new edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties. This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and calling it in `partitionBy`. Author: Ankur Dave <ankurdave@gmail.com> Closes #885 from ankurdave/SPARK-1931 and squashes the following commits: 3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins 9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
1 files changed, 10 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 7b9bac5d9c..abc25d0671 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
+
+ // Forming triplets view
+ val g = Graph(
+ sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
+ sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
+ assert(g.triplets.collect.map(_.toTuple).toSet ===
+ Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
+ val gPart = g.partitionBy(EdgePartition2D)
+ assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+ Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}