aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-05-26 16:10:22 -0700
committerReynold Xin <rxin@apache.org>2014-05-26 16:10:30 -0700
commitf268548df0b7315b97518914fcacd8c64cb39954 (patch)
treec2baa577466e60264546bd3d90e2684a4f3202b4 /graphx
parentf09cb8506c9f6a562b5749dcc7dacb701c016ad2 (diff)
downloadspark-f268548df0b7315b97518914fcacd8c64cb39954.tar.gz
spark-f268548df0b7315b97518914fcacd8c64cb39954.tar.bz2
spark-f268548df0b7315b97518914fcacd8c64cb39954.zip
[SPARK-1931] Reconstruct routing tables in Graph.partitionBy
905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where, after repartitioning the edges, it reuses the VertexRDD without updating the routing tables to reflect the new edge layout. Subsequent accesses of the triplets contain nulls for many vertex properties. This commit adds a test for this bug and fixes it by introducing `VertexRDD#withEdges` and calling it in `partitionBy`. Author: Ankur Dave <ankurdave@gmail.com> Closes #885 from ankurdave/SPARK-1931 and squashes the following commits: 3930cdd [Ankur Dave] Note how to set up VertexRDD for efficient joins 9bdbaa4 [Ankur Dave] [SPARK-1931] Reconstruct routing tables in Graph.partitionBy (cherry picked from commit 56c771cb2d00a5843c391ae6561536ee46e535d4) Signed-off-by: Reynold Xin <rxin@apache.org>
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala13
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala10
3 files changed, 31 insertions, 4 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8c62897037..8b910fbc5a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -300,6 +300,18 @@ class VertexRDD[@specialized VD: ClassTag](
def reverseRoutingTables(): VertexRDD[VD] =
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
+ /** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
+ def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
+ val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
+ val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
+ (partIter, routingTableIter) =>
+ val routingTable =
+ if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
+ partIter.map(_.withRoutingTable(routingTable))
+ }
+ new VertexRDD(vertexPartitions)
+ }
+
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
private[graphx] def shipVertexAttributes(
shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 2f2d0e03fd..1649b244d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -88,8 +88,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
- }, preservesPartitioning = true))
- GraphImpl.fromExistingRDDs(vertices, newEdges)
+ }, preservesPartitioning = true)).cache()
+ GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}
override def reverse: Graph[VD, ED] = {
@@ -277,7 +277,11 @@ object GraphImpl {
GraphImpl(vertexRDD, edgeRDD)
}
- /** Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. */
+ /**
+ * Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
+ * VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
+ * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
+ */
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
@@ -290,7 +294,8 @@ object GraphImpl {
/**
* Create a graph from a VertexRDD and an EdgeRDD with the same replicated vertex type as the
- * vertices.
+ * vertices. The VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
+ * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 7b9bac5d9c..abc25d0671 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -133,6 +133,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
Iterator((part.srcIds ++ part.dstIds).toSet)
}.collect
assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
+
+ // Forming triplets view
+ val g = Graph(
+ sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
+ sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
+ assert(g.triplets.collect.map(_.toTuple).toSet ===
+ Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
+ val gPart = g.partitionBy(EdgePartition2D)
+ assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+ Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}