aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-04-16 17:15:50 -0700
committerReynold Xin <rxin@apache.org>2014-04-16 17:15:50 -0700
commit235a47ce14b3c7523e79ce671355dea7ee06f4b7 (patch)
tree977ccb414412c02f1934816cadd9061c84662046 /graphx/src/test
parent987760ec0aa914995b742b234fc8663b74f5476f (diff)
downloadspark-235a47ce14b3c7523e79ce671355dea7ee06f4b7.tar.gz
spark-235a47ce14b3c7523e79ce671355dea7ee06f4b7.tar.bz2
spark-235a47ce14b3c7523e79ce671355dea7ee06f4b7.zip
Rebuild routing table after Graph.reverse
GraphImpl.reverse used to reverse edges in each partition of the edge RDD but preserve the routing table and replicated vertex view, since reversing should not affect partitioning. However, the old routing table would then have incorrect information for srcAttrOnly and dstAttrOnly. These RDDs should be switched. A simple fix is for Graph.reverse to rebuild the routing table and replicated vertex view. Thanks to Bogdan Ghidireac for reporting this issue on the [mailing list](http://apache-spark-user-list.1001560.n3.nabble.com/graph-reverse-amp-Pregel-API-td4338.html). Author: Ankur Dave <ankurdave@gmail.com> Closes #431 from ankurdave/fix-reverse-bug and squashes the following commits: 75d63cb [Ankur Dave] Rebuild routing table after Graph.reverse
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
1 files changed, 10 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index c65e36636f..d9ba4672ce 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("reverse with join elimination") {
+ withSpark { sc =>
+ val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
+ val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
+ val graph = Graph(vertices, edges).reverse
+ val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ assert(result.collect.toSet === Set((1L, 2)))
+ }
+ }
+
test("subgraph") {
withSpark { sc =>
// Create a star graph of 10 veritces.