aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
2 files changed, 11 insertions, 1 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index c2b510a31e..9eabccdee4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def reverse: Graph[VD, ED] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
- new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+ GraphImpl(vertices, newETable)
}
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index c65e36636f..d9ba4672ce 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("reverse with join elimination") {
+ withSpark { sc =>
+ val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
+ val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
+ val graph = Graph(vertices, edges).reverse
+ val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ assert(result.collect.toSet === Set((1L, 2)))
+ }
+ }
+
test("subgraph") {
withSpark { sc =>
// Create a star graph of 10 veritces.