aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-04-16 17:16:55 -0700
committerReynold Xin <rxin@apache.org>2014-04-16 17:16:55 -0700
commit17d323455a9c8b640f149be4a81139ed638765b5 (patch)
tree0a697d537c508d6d2d2d1197c3aa92fd523146c6 /graphx
parent235a47ce14b3c7523e79ce671355dea7ee06f4b7 (diff)
downloadspark-17d323455a9c8b640f149be4a81139ed638765b5.tar.gz
spark-17d323455a9c8b640f149be4a81139ed638765b5.tar.bz2
spark-17d323455a9c8b640f149be4a81139ed638765b5.zip
SPARK-1329: Create pid2vid with correct number of partitions
Each vertex partition is co-located with a pid2vid array created in RoutingTable.scala. This array maps edge partition IDs to the list of vertices in the current vertex partition that are mentioned by edges in that partition. Therefore the pid2vid array should have one entry per edge partition. GraphX currently creates one entry per *vertex* partition, which is a bug that leads to an ArrayIndexOutOfBoundsException when there are more edge partitions than vertex partitions. This commit fixes the bug and adds a test for this case. Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug. Author: Ankur Dave <ankurdave@gmail.com> Closes #368 from ankurdave/fix-pid2vid-size and squashes the following commits: 5a5c52a [Ankur Dave] SPARK-1329: Create pid2vid with correct number of partitions
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala12
2 files changed, 14 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index fe44e1ee0c..022d5668e2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
vSet.iterator.map { vid => (vid, pid) }
}
- val numPartitions = vertices.partitions.size
+ val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
+ val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index d9ba4672ce..32b5fe4813 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("more edge partitions than vertex partitions") {
+ withSpark { sc =>
+ val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
+ val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
+ val graph = Graph(verts, edges)
+ val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
+ .collect.toSet
+ assert(triplets ===
+ Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
+ }
+ }
+
}