aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-20 10:56:10 -0800
committerReynold Xin <rxin@apache.org>2013-12-20 10:56:10 -0800
commitac70b8f234493fa670104f0599669500697d2533 (patch)
tree3362854cf3a55760406ab281fa8ea95d6bac5ac2
parent45310d4a8b8d63c5bff9a6863f70dc244917990b (diff)
parentefc765cf1a287c398e3321c374263a740200fe89 (diff)
downloadspark-ac70b8f234493fa670104f0599669500697d2533.tar.gz
spark-ac70b8f234493fa670104f0599669500697d2533.tar.bz2
spark-ac70b8f234493fa670104f0599669500697d2533.zip
Merge pull request #117 from ankurdave/more-tests
More tests
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala42
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala41
2 files changed, 75 insertions, 8 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 487d949e1f..e6c19dbc40 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -83,8 +83,32 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
// the same partition
assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
- // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex
- // replication
+ // partitionBy(EdgePartition2D) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
+
+ // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p)
+ // partitions
+ val n = 100
+ val p = 100
+ val verts = 1 to n
+ val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
+ verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0)
+ assert(graph.edges.partitions.length === p)
+ val partitionedGraph = graph.partitionBy(EdgePartition2D)
+ assert(graph.edges.partitions.length === p)
+ val bound = 2 * math.sqrt(p)
+ // Each vertex should be replicated to at most 2 * sqrt(p) partitions
+ val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ // This should not be true for the default hash partitioning
+ val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
}
}
@@ -114,7 +138,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
test("mapTriplets") {
- // TODO(ankurdave): Write the test
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
+ (1L to n).map(x => Edge(0, x, "vv")).toSet)
+ }
}
test("reverse") {
@@ -223,12 +252,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val reverseStar = starGraph(sc, n).reverse
+ // outerJoinVertices changing type
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
+ // outerJoinVertices preserving type
+ val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
+ val newReverseStar =
+ reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
+ assert(newReverseStar.vertices.map(_._2).collect.toSet ===
+ (0 to n).map(x => "v%d".format(x)).toSet)
}
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
index caedb55ea2..a52a5653e2 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
@@ -4,15 +4,46 @@ import scala.util.Random
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.graph.Graph._
import org.apache.spark.graph._
-import org.apache.spark.rdd._
-
class EdgePartitionSuite extends FunSuite {
- test("sort") {
+ test("reverse") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges)
+ assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges)
+ }
+
+ test("map") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList ===
+ edges.map(e => e.copy(attr = e.srcId + e.dstId)))
+ }
+
+ test("groupEdges") {
+ val edges = List(
+ Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
+ val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
+ }
+
+ test("indexIterator") {
val edgesFrom0 = List(Edge(0, 1, 0))
val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
val sortedEdges = edgesFrom0 ++ edgesFrom1