aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-20 01:06:49 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-20 01:06:49 -0800
commit752c0106e8d935875a06ec21b05de2c1f3dbc56a (patch)
tree893b77aeab608492b831780caa666aaedf1f0626 /graph/src
parentc3ec91a462c1e9582b9cb08a231f2aad10e4e52e (diff)
downloadspark-752c0106e8d935875a06ec21b05de2c1f3dbc56a.tar.gz
spark-752c0106e8d935875a06ec21b05de2c1f3dbc56a.tar.bz2
spark-752c0106e8d935875a06ec21b05de2c1f3dbc56a.zip
Test EdgePartition2D
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala28
1 files changed, 26 insertions, 2 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 6494ef8900..e6c19dbc40 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -83,8 +83,32 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
// the same partition
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
- // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex
- // replication
+ // partitionBy(EdgePartition2D) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
+
+ // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p)
+ // partitions
+ val n = 100
+ val p = 100
+ val verts = 1 to n
+ val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
+ verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0)
+ assert(graph.edges.partitions.length === p)
+ val partitionedGraph = graph.partitionBy(EdgePartition2D)
+ assert(graph.edges.partitions.length === p)
+ val bound = 2 * math.sqrt(p)
+ // Each vertex should be replicated to at most 2 * sqrt(p) partitions
+ val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ // This should not be true for the default hash partitioning
+ val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
}
}