aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala12
2 files changed, 14 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index fe44e1ee0c..022d5668e2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
vSet.iterator.map { vid => (vid, pid) }
}
- val numPartitions = vertices.partitions.size
+ val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
+ val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index d9ba4672ce..32b5fe4813 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("more edge partitions than vertex partitions") {
+ withSpark { sc =>
+ val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
+ val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
+ val graph = Graph(verts, edges)
+ val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
+ .collect.toSet
+ assert(triplets ===
+ Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
+ }
+ }
+
}