diff options
author | Ankur Dave <ankurdave@gmail.com> | 2013-12-19 19:49:07 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2013-12-19 19:49:07 -0800 |
commit | da9f5e3fc093a91e0e91bc9311d5f5d085dbc929 (patch) | |
tree | ff689f2434e026210cffd9d6306d4c1aad7d061b | |
parent | da301b57fc7f606e2b8fd0acaf95aa1bd9b643b0 (diff) | |
download | spark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.tar.gz spark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.tar.bz2 spark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.zip |
Split GraphSuite; simplify LocalSparkContext
8 files changed, 344 insertions, 264 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala index d36339b65b..1e6d8ec7cf 100644 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala @@ -7,8 +7,6 @@ import org.apache.spark.SparkContext._ import org.apache.spark.graph.algorithms._ import org.apache.spark.rdd._ -import org.apache.spark.graph.LocalSparkContext._ - import org.apache.spark.graph.util.GraphGenerators @@ -48,16 +46,13 @@ object GridPageRank { class AnalyticsSuite extends FunSuite with LocalSparkContext { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } .map { case (id, error) => error }.sum } test("Star PageRank") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val nVertices = 100 val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() val resetProb = 0.15 @@ -89,7 +84,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Grid PageRank") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val rows = 10 val cols = 10 val resetProb = 0.15 @@ -111,7 +106,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Chain PageRank") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val chain1 = (0 until 9).map(x => (x, x+1) ) val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() @@ -131,7 +126,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Grid Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() val ccGraph = ConnectedComponents.run(gridGraph).cache() val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum @@ -141,7 +136,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Reverse Grid Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache() val ccGraph = ConnectedComponents.run(gridGraph).cache() val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum @@ -151,7 +146,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { test("Chain Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val chain1 = (0 until 9).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) ) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } @@ -174,7 +169,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } // end of chain connected components test("Reverse Chain Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val chain1 = (0 until 9).map(x => (x, x+1) ) val chain2 = (10 until 20).map(x => (x, x+1) ) val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } @@ -200,7 +195,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } // end of reverse chain connected components test("Island Strongly Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) val edges = sc.parallelize(Seq.empty[Edge[Int]]) val graph = Graph(vertices, edges) @@ -212,7 +207,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Cycle Strongly Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) val graph = Graph.fromEdgeTuples(rawEdges, -1) val sccGraph = StronglyConnectedComponents.run(graph, 20) @@ -223,7 +218,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("2 Cycle Strongly Connected Components") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val edges = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ @@ -243,7 +238,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Count a single triangle") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = TriangleCount.run(graph) @@ -253,7 +248,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Count two triangles") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> -1L, -1L -> -2L, -2L -> 0L) val rawEdges = sc.parallelize(triangles, 2) @@ -271,7 +266,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Count two triangles with bi-directed edges") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> -1L, -1L -> -2L, -2L -> 0L) @@ -291,7 +286,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Count a single triangle with duplicate edges") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() @@ -302,14 +297,14 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext { } test("Test SVD++ with mean square error on training set") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val SvdppErr = 0.01 - val edges = sc.textFile("mllib/data/als/test.data").map { line => + val edges = sc.textFile("mllib/data/als/test.data").map { line => val fields = line.split(",") Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) } val graph = Svdpp.run(edges) - val err = graph.vertices.collect.map{ case (vid, vd) => + val err = graph.vertices.collect.map{ case (vid, vd) => if (vid % 2 == 1) { vd.norm } else { 0.0 } }.reduce(_ + _) / graph.triplets.collect.size assert(err < SvdppErr) diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala new file mode 100644 index 0000000000..c055e461b7 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala @@ -0,0 +1,94 @@ +package org.apache.spark.graph + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.Graph._ +import org.apache.spark.graph.impl.EdgePartition +import org.apache.spark.graph.impl.EdgePartitionBuilder +import org.apache.spark.rdd._ + +class GraphOpsSuite extends FunSuite with LocalSparkContext { + + test("aggregateNeighbors") { + withSpark { sc => + val n = 3 + val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1) + + val indegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.In) + assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) + + val outdegrees = star.aggregateNeighbors( + (vid, edge) => Some(1), + (a: Int, b: Int) => a + b, + EdgeDirection.Out) + assert(outdegrees.collect().toSet === Set((0, n))) + + val noVertexValues = star.aggregateNeighbors[Int]( + (vid: Vid, edge: EdgeTriplet[Int, Int]) => None, + (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), + EdgeDirection.In) + assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)]) + } + } + + test("joinVertices") { + withSpark { sc => + val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2) + val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) + val g: Graph[String, String] = Graph(vertices, edges) + + val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u } + + val v = g1.vertices.collect().toSet + assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) + } + } + + test("collectNeighborIds") { + withSpark { sc => + val chain = (0 until 100).map(x => (x, (x+1)%100) ) + val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } + val graph = Graph.fromEdgeTuples(rawEdges, 1.0) + val nbrs = graph.collectNeighborIds(EdgeDirection.Both) + assert(nbrs.count === chain.size) + assert(graph.numVertices === nbrs.count) + nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } + nbrs.collect.foreach { case (vid, nbrs) => + val s = nbrs.toSet + assert(s.contains((vid + 1) % 100)) + assert(s.contains(if (vid > 0) vid - 1 else 99 )) + } + } + } + + test ("filter") { + withSpark { sc => + val n = 5 + val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) + val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) + val graph: Graph[Int, Int] = Graph(vertices, edges) + val filteredGraph = graph.filter( + graph => { + val degrees: VertexRDD[Int] = graph.outDegrees + graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} + }, + vpred = (vid: Vid, deg:Int) => deg > 0 + ) + + val v = filteredGraph.vertices.collect().toSet + assert(v === Set((0,0))) + + // the map is necessary because of object-reuse in the edge iterator + val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet + assert(e.isEmpty) + } + } + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index f6bb201a83..09da102350 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -6,32 +6,47 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.graph.Graph._ -import org.apache.spark.graph.LocalSparkContext._ import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") + } - test("Graph Creation") { - withSpark(new SparkContext("local", "test")) { sc => - val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L) - val edges = sc.parallelize(rawEdges) - val graph = Graph.fromEdgeTuples(edges, 1.0F) - assert(graph.edges.count() === rawEdges.size) + test("Graph.fromEdgeTuples") { + withSpark { sc => + val ring = (0L to 100L).zip((1L to 99L) :+ 0L) + val doubleRing = ring ++ ring + val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1) + assert(graph.edges.count() === doubleRing.size) + assert(graph.edges.collect.forall(e => e.attr == 1)) + + // uniqueEdges option should uniquify edges and store duplicate count in edge attributes + val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut)) + assert(uniqueGraph.edges.count() === ring.size) + assert(uniqueGraph.edges.collect.forall(e => e.attr == 2)) } } - test("Graph Creation with invalid vertices") { - withSpark(new SparkContext("local", "test")) { sc => + test("Graph.fromEdges") { + withSpark { sc => + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1) } + val graph = Graph.fromEdges(sc.parallelize(ring), 1.0F) + assert(graph.edges.count() === ring.size) + } + } + + test("Graph.apply") { + withSpark { sc => val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) val graph = Graph(vertices, edges, false) assert( graph.edges.count() === rawEdges.size ) + // Vertices not explicitly provided but referenced by edges should be created automatically assert( graph.vertices.count() === 100) graph.triplets.map { et => assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr)) @@ -40,153 +55,97 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test("core operations") { - withSpark(new SparkContext("local", "test")) { sc => + test("triplets") { + withSpark { sc => val n = 5 - val star = Graph.fromEdgeTuples( - sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") - // triplets + val star = starGraph(sc, n) assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet) - // reverse - val reverseStar = star.reverse - assert(reverseStar.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet) - // outerJoinVertices - val reverseStarDegrees = - reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } - val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( - et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), - (a: Int, b: Int) => a + b).collect.toSet - assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0))) + } + } + + test("partitionBy") { + withSpark { sc => + def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0) + def nonemptyParts(graph: Graph[Int, Int]) = { + graph.edges.partitionsRDD.mapPartitions { iter => + Iterator(iter.next()._2.iterator.toList) + }.filter(_.nonEmpty) + } + val identicalEdges = List((0L, 1L), (0L, 1L)) + val canonicalEdges = List((0L, 1L), (1L, 0L)) + val sameSrcEdges = List((0L, 1L), (0L, 2L)) + + // The two edges start out in different partitions + for (edges <- List(identicalEdges, canonicalEdges, sameSrcEdges)) { + assert(nonemptyParts(mkGraph(edges)).count === 2) + } + // partitionBy(RandomVertexCut) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1) + // partitionBy(EdgePartition1D) puts same-source edges in the same partition + assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1) + // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into + // the same partition + assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) + // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex + // replication + } + } + + test("mapVertices") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) // mapVertices preserving type - val mappedVAttrs = reverseStar.mapVertices((vid, attr) => attr + "2") + val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet) // mapVertices changing type - val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length) + val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet) - // groupEdges - val doubleStar = Graph.fromEdgeTuples( - sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v") - val star2 = doubleStar.groupEdges { (a, b) => a} - assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === - star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) - assert(star2.vertices.collect.toSet === star.vertices.collect.toSet) } } test("mapEdges") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val n = 3 - val star = Graph.fromEdgeTuples( - sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v") + val star = starGraph(sc, n) val starWithEdgeAttrs = star.mapEdges(e => e.dstId) - // map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25 - val edges = starWithEdgeAttrs.edges.map(_.copy()).collect() + val edges = starWithEdgeAttrs.edges.collect() assert(edges.size === n) assert(edges.toSet === (1 to n).map(x => Edge(0, x, x)).toSet) } } - test("mapReduceTriplets") { - withSpark(new SparkContext("local", "test")) { sc => - val n = 5 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0) - val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } - val neighborDegreeSums = starDeg.mapReduceTriplets( - edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), - (a: Int, b: Int) => a + b) - assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) - - // activeSetOpt - val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid) - val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) - val vids = complete.mapVertices((vid, attr) => vid).cache() - val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } - val numEvenNeighbors = vids.mapReduceTriplets(et => { - // Map function should only run on edges with destination in the active set - if (et.dstId % 2 != 0) { - throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId)) - } - Iterator((et.srcId, 1)) - }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet - assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet) - - // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) - val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0) - .mapVertices((vid, attr) => vid).cache() - val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_) - val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } - val numOddNeighbors = changedGraph.mapReduceTriplets(et => { - // Map function should only run on edges with source in the active set - if (et.srcId % 2 != 1) { - throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId)) - } - Iterator((et.dstId, 1)) - }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet - assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet) - - } + test("mapTriplets") { } - test("aggregateNeighbors") { - withSpark(new SparkContext("local", "test")) { sc => - val n = 3 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1) - - val indegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.In) - assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet) - - val outdegrees = star.aggregateNeighbors( - (vid, edge) => Some(1), - (a: Int, b: Int) => a + b, - EdgeDirection.Out) - assert(outdegrees.collect().toSet === Set((0, n))) - - val noVertexValues = star.aggregateNeighbors[Int]( - (vid: Vid, edge: EdgeTriplet[Int, Int]) => None, - (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), - EdgeDirection.In) - assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)]) + test("reverse") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n) + assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet) } } - test("joinVertices") { - withSpark(new SparkContext("local", "test")) { sc => - val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2) - val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) - val g: Graph[String, String] = Graph(vertices, edges) - - val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20))) - val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u } + test("subgraph") { + withSpark { sc => + // Create a star graph of 10 veritces. + val n = 10 + val star = starGraph(sc, n) + // Take only vertices whose vids are even + val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0) - val v = g1.vertices.collect().toSet - assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) - } - } + // We should have 5 vertices. + assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet) - test("collectNeighborIds") { - withSpark(new SparkContext("local", "test")) { sc => - val chain = (0 until 100).map(x => (x, (x+1)%100) ) - val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } - val graph = Graph.fromEdgeTuples(rawEdges, 1.0) - val nbrs = graph.collectNeighborIds(EdgeDirection.Both) - assert(nbrs.count === chain.size) - assert(graph.numVertices === nbrs.count) - nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } - nbrs.collect.foreach { case (vid, nbrs) => - val s = nbrs.toSet - assert(s.contains((vid + 1) % 100)) - assert(s.contains(if (vid > 0) vid - 1 else 99 )) - } + // And 4 edges. + assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet) } } test("mask") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val n = 5 val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) @@ -209,90 +168,71 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test ("filter") { - withSpark(new SparkContext("local", "test")) { sc => + test("groupEdges") { + withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) - val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) - val graph: Graph[Int, Int] = Graph(vertices, edges) - val filteredGraph = graph.filter( - graph => { - val degrees: VertexRDD[Int] = graph.outDegrees - graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} - }, - vpred = (vid: Vid, deg:Int) => deg > 0 - ) - - val v = filteredGraph.vertices.collect().toSet - assert(v === Set((0,0))) - - // the map is necessary because of object-reuse in the edge iterator - val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet - assert(e.isEmpty) + val star = starGraph(sc, n) + val doubleStar = Graph.fromEdgeTuples( + sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v") + val star2 = doubleStar.groupEdges { (a, b) => a} + assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === + star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) + assert(star2.vertices.collect.toSet === star.vertices.collect.toSet) } } - test("VertexSetRDD") { - withSpark(new SparkContext("local", "test")) { sc => - val n = 100 - val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5) - val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b - assert(b.count === n + 1) - assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0) - val c = b.aggregateUsingIndex[Long](a, (x, y) => x) - assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0) - val d = c.filter(q => ((q._2 % 2) == 0)) - val e = a.filter(q => ((q._2 % 2) == 0)) - assert(d.count === e.count) - assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0) - val f = b.mapValues(x => if (x % 2 == 0) -x else x) - assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet) - } - } + test("mapReduceTriplets") { + withSpark { sc => + val n = 5 + val star = starGraph(sc, n).mapVertices { (_, _) => 0 } + val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg } + val neighborDegreeSums = starDeg.mapReduceTriplets( + edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)), + (a: Int, b: Int) => a + b) + assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) - test("subgraph") { - withSpark(new SparkContext("local", "test")) { sc => - // Create a star graph of 10 veritces. - val n = 10 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v") - // Take only vertices whose vids are even - val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0) + // activeSetOpt + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid) + val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) + val vids = complete.mapVertices((vid, attr) => vid).cache() + val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } + val numEvenNeighbors = vids.mapReduceTriplets(et => { + // Map function should only run on edges with destination in the active set + if (et.dstId % 2 != 0) { + throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId)) + } + Iterator((et.srcId, 1)) + }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet + assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet) - // We should have 5 vertices. - assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet) + // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) + val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0) + .mapVertices((vid, attr) => vid).cache() + val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_) + val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } + val numOddNeighbors = changedGraph.mapReduceTriplets(et => { + // Map function should only run on edges with source in the active set + if (et.srcId % 2 != 1) { + throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId)) + } + Iterator((et.dstId, 1)) + }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet + assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet) - // And 4 edges. - assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet) } } - test("EdgePartition.sort") { - val edgesFrom0 = List(Edge(0, 1, 0)) - val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) - val sortedEdges = edgesFrom0 ++ edgesFrom1 - val builder = new EdgePartitionBuilder[Int] - for (e <- Random.shuffle(sortedEdges)) { - builder.add(e.srcId, e.dstId, e.attr) + test("outerJoinVertices") { + withSpark { sc => + val n = 5 + val reverseStar = starGraph(sc, n).reverse + val reverseStarDegrees = + reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) } + val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( + et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), + (a: Int, b: Int) => a + b).collect.toSet + assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0))) } - - val edgePartition = builder.toEdgePartition - assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) - assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) - assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) } - test("EdgePartition.innerJoin") { - def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { - val builder = new EdgePartitionBuilder[A] - for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } - builder.toEdgePartition - } - val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) - val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) - val a = makeEdgePartition(aList) - val b = makeEdgePartition(bList) - - assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === - List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) - } } diff --git a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala index 4a0155b6bd..5c20d559aa 100644 --- a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala +++ b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala @@ -6,39 +6,23 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkContext -/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */ -trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => - - @transient var sc: SparkContext = _ - - override def afterEach() { - resetSparkContext() - super.afterEach() - } - - def resetSparkContext() = { - if (sc != null) { - LocalSparkContext.stop(sc) - sc = null - } - } - -} - -object LocalSparkContext { - def stop(sc: SparkContext) { - sc.stop() - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ - def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { +/** + * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped + * after each test. +*/ +trait LocalSparkContext { + System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") + + /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ + def withSpark[T](f: SparkContext => T) = { + val sc = new SparkContext("local", "test") try { f(sc) } finally { - stop(sc) + sc.stop() + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") } } - } diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala index 0897d9783e..44182e85ee 100644 --- a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala @@ -3,16 +3,12 @@ package org.apache.spark.graph import org.scalatest.FunSuite import org.apache.spark.SparkContext -import org.apache.spark.graph.LocalSparkContext._ import org.apache.spark.rdd._ class PregelSuite extends FunSuite with LocalSparkContext { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - test("1 iteration") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val n = 5 val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") val result = Pregel(star, 0)( @@ -24,7 +20,7 @@ class PregelSuite extends FunSuite with LocalSparkContext { } test("chain propagation") { - withSpark(new SparkContext("local", "test")) { sc => + withSpark { sc => val n = 5 val chain = Graph.fromEdgeTuples( sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3), diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala index 6b86f9b25d..80075f3437 100644 --- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala @@ -7,7 +7,6 @@ import scala.util.Random import org.scalatest.FunSuite import org.apache.spark._ -import org.apache.spark.graph.LocalSparkContext._ import org.apache.spark.graph.impl._ import org.apache.spark.graph.impl.MsgRDDFunctions._ import org.apache.spark.serializer.SerializationStream @@ -15,9 +14,6 @@ import org.apache.spark.serializer.SerializationStream class SerializerSuite extends FunSuite with LocalSparkContext { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - test("IntVertexBroadcastMsgSerializer") { val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) val bout = new ByteArrayOutputStream @@ -139,7 +135,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } test("TestShuffleVertexBroadcastMsg") { - withSpark(new SparkContext("local[2]", "test")) { sc => + withSpark { sc => val bmsgs = sc.parallelize(0 until 100, 10).map { pid => new VertexBroadcastMsg[Int](pid, pid, pid) } diff --git a/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala b/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala new file mode 100644 index 0000000000..316968bbf0 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala @@ -0,0 +1,32 @@ +package org.apache.spark.graph + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.Graph._ +import org.apache.spark.graph.impl.EdgePartition +import org.apache.spark.graph.impl.EdgePartitionBuilder +import org.apache.spark.rdd._ + +class VertexRDDSuite extends FunSuite with LocalSparkContext { + + test("VertexRDD") { + withSpark { sc => + val n = 100 + val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5) + val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b + assert(b.count === n + 1) + assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0) + val c = b.aggregateUsingIndex[Long](a, (x, y) => x) + assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0) + val d = c.filter(q => ((q._2 % 2) == 0)) + val e = a.filter(q => ((q._2 % 2) == 0)) + assert(d.count === e.count) + assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0) + val f = b.mapValues(x => if (x % 2 == 0) -x else x) + assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet) + } + } +} diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala new file mode 100644 index 0000000000..2bce90120d --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala @@ -0,0 +1,43 @@ +package org.apache.spark.graph.impl + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.graph.Graph._ +import org.apache.spark.graph._ +import org.apache.spark.rdd._ + +class EdgePartitionSuite extends FunSuite { + + test("EdgePartition.sort") { + val edgesFrom0 = List(Edge(0, 1, 0)) + val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) + val sortedEdges = edgesFrom0 ++ edgesFrom1 + val builder = new EdgePartitionBuilder[Int] + for (e <- Random.shuffle(sortedEdges)) { + builder.add(e.srcId, e.dstId, e.attr) + } + + val edgePartition = builder.toEdgePartition + assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges) + assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0) + assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) + } + + test("EdgePartition.innerJoin") { + def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { + val builder = new EdgePartitionBuilder[A] + for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } + builder.toEdgePartition + } + val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) + val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) + val a = makeEdgePartition(aList) + val b = makeEdgePartition(bList) + + assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === + List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) + } +} |