diff options
-rw-r--r-- | graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 6494ef8900..e6c19dbc40 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -83,8 +83,32 @@ class GraphSuite extends FunSuite with LocalSparkContext { // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into // the same partition assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) - // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex - // replication + // partitionBy(EdgePartition2D) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1) + + // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p) + // partitions + val n = 100 + val p = 100 + val verts = 1 to n + val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => + verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0) + assert(graph.edges.partitions.length === p) + val partitionedGraph = graph.partitionBy(EdgePartition2D) + assert(graph.edges.partitions.length === p) + val bound = 2 * math.sqrt(p) + // Each vertex should be replicated to at most 2 * sqrt(p) partitions + val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) + // This should not be true for the default hash partitioning + val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter => + val part = iter.next()._2 + Iterator((part.srcIds ++ part.dstIds).toSet) + }.collect + assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound)) } } |