From 95381dfb9461af65f31cc2f18a125b3a82abbc3f Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Thu, 19 Dec 2013 20:46:03 -0800 Subject: Split AnalyticsSuite into algorithms suites --- .../scala/org/apache/spark/graph/GraphLab.scala | 5 +- .../org/apache/spark/graph/AnalyticsSuite.scala | 313 --------------------- .../org/apache/spark/graph/GraphOpsSuite.scala | 6 +- .../algorithms/ConnectedComponentsSuite.scala | 83 ++++++ .../spark/graph/algorithms/PageRankSuite.scala | 126 +++++++++ .../StronglyConnectedComponentsSuite.scala | 57 ++++ .../apache/spark/graph/algorithms/SvdppSuite.scala | 29 ++ .../graph/algorithms/TriangleCountSuite.scala | 73 +++++ .../spark/graph/impl/EdgePartitionSuite.scala | 5 +- 9 files changed, 375 insertions(+), 322 deletions(-) delete mode 100644 graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala create mode 100644 graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala create mode 100644 graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala create mode 100644 graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala create mode 100644 graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala create mode 100644 graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 5618ce6272..5d2f0f4bda 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -1,12 +1,13 @@ package org.apache.spark.graph +import org.apache.spark.Logging import scala.collection.JavaConversions._ import org.apache.spark.rdd.RDD /** * This object implements the GraphLab gather-apply-scatter api. */ -object GraphLab { +object GraphLab extends Logging { /** * Execute the GraphLab Gather-Apply-Scatter API @@ -119,7 +120,7 @@ object GraphLab { numActive = activeGraph.vertices.map{ case (vid, data) => if (data._1) 1 else 0 }.reduce(_ + _) - println("Number active vertices: " + numActive) + logInfo("Number active vertices: " + numActive) i += 1 } diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala deleted file mode 100644 index 77a193a9ac..0000000000 --- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala +++ /dev/null @@ -1,313 +0,0 @@ -package org.apache.spark.graph - -import org.scalatest.FunSuite - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.graph.algorithms._ -import org.apache.spark.rdd._ - -import org.apache.spark.graph.util.GraphGenerators - - -object GridPageRank { - def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) - val outDegree = Array.fill(nRows * nCols)(0) - // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): Int = r * nCols + c - // Make the grid graph - for (r <- 0 until nRows; c <- 0 until nCols) { - val ind = sub2ind(r,c) - if (r+1 < nRows) { - outDegree(ind) += 1 - inNbrs(sub2ind(r+1,c)) += ind - } - if (c+1 < nCols) { - outDegree(ind) += 1 - inNbrs(sub2ind(r,c+1)) += ind - } - } - // compute the pagerank - var pr = Array.fill(nRows * nCols)(resetProb) - for (iter <- 0 until nIter) { - val oldPr = pr - pr = new Array[Double](nRows * nCols) - for (ind <- 0 until (nRows * nCols)) { - pr(ind) = resetProb + (1.0 - resetProb) * - inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum - } - } - (0L until (nRows * nCols)).zip(pr) - } - -} - - -class AnalyticsSuite extends FunSuite with LocalSparkContext { - - def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { - a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } - .map { case (id, error) => error }.sum - } - - test("Star PageRank") { - withSpark { sc => - val nVertices = 100 - val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() - val resetProb = 0.15 - val errorTol = 1.0e-5 - - val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache() - val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache() - - // Static PageRank should only take 2 iterations to converge - val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => - if (pr1 != pr2) 1 else 0 - }.map { case (vid, test) => test }.sum - assert(notMatching === 0) - - val staticErrors = staticRanks2.map { case (vid, pr) => - val correct = (vid > 0 && pr == resetProb) || - (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) - if (!correct) 1 else 0 - } - assert(staticErrors.sum === 0) - - val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache() - val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache() - assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) - assert(compareRanks(staticRanks2, standaloneRanks) < errorTol) - } - } // end of test Star PageRank - - - - test("Grid PageRank") { - withSpark { sc => - val rows = 10 - val cols = 10 - val resetProb = 0.15 - val tol = 0.0001 - val numIter = 50 - val errorTol = 1.0e-5 - val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() - - val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache() - val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache() - val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache() - val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))) - - assert(compareRanks(staticRanks, referenceRanks) < errorTol) - assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) - assert(compareRanks(standaloneRanks, referenceRanks) < errorTol) - } - } // end of Grid PageRank - - - test("Chain PageRank") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } - val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() - val resetProb = 0.15 - val tol = 0.0001 - val numIter = 10 - val errorTol = 1.0e-5 - - val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache() - val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache() - val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache() - - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) - assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol) - } - } - - - test("Grid Connected Components") { - withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() - val ccGraph = ConnectedComponents.run(gridGraph).cache() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum - assert(maxCCid === 0) - } - } // end of Grid connected components - - - test("Reverse Grid Connected Components") { - withSpark { sc => - val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache() - val ccGraph = ConnectedComponents.run(gridGraph).cache() - val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum - assert(maxCCid === 0) - } - } // end of Grid connected components - - - test("Chain Connected Components") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val chain2 = (10 until 20).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache() - val ccGraph = ConnectedComponents.run(twoChains).cache() - val vertices = ccGraph.vertices.collect() - for ( (id, cc) <- vertices ) { - if(id < 10) { assert(cc === 0) } - else { assert(cc === 10) } - } - val ccMap = vertices.toMap - for (id <- 0 until 20) { - if (id < 10) { - assert(ccMap(id) === 0) - } else { - assert(ccMap(id) === 10) - } - } - } - } // end of chain connected components - - test("Reverse Chain Connected Components") { - withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x+1) ) - val chain2 = (10 until 20).map(x => (x, x+1) ) - val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } - val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache() - val ccGraph = ConnectedComponents.run(twoChains).cache() - val vertices = ccGraph.vertices.collect - for ( (id, cc) <- vertices ) { - if (id < 10) { - assert(cc === 0) - } else { - assert(cc === 10) - } - } - val ccMap = vertices.toMap - for ( id <- 0 until 20 ) { - if (id < 10) { - assert(ccMap(id) === 0) - } else { - assert(ccMap(id) === 10) - } - } - } - } // end of reverse chain connected components - - test("Island Strongly Connected Components") { - withSpark { sc => - val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) - val edges = sc.parallelize(Seq.empty[Edge[Int]]) - val graph = Graph(vertices, edges) - val sccGraph = StronglyConnectedComponents.run(graph, 5) - for ((id, scc) <- sccGraph.vertices.collect) { - assert(id == scc) - } - } - } - - test("Cycle Strongly Connected Components") { - withSpark { sc => - val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) - val graph = Graph.fromEdgeTuples(rawEdges, -1) - val sccGraph = StronglyConnectedComponents.run(graph, 20) - for ((id, scc) <- sccGraph.vertices.collect) { - assert(0L == scc) - } - } - } - - test("2 Cycle Strongly Connected Components") { - withSpark { sc => - val edges = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ - Array(6L -> 0L, 5L -> 7L) - val rawEdges = sc.parallelize(edges) - val graph = Graph.fromEdgeTuples(rawEdges, -1) - val sccGraph = StronglyConnectedComponents.run(graph, 20) - for ((id, scc) <- sccGraph.vertices.collect) { - if (id < 3) - assert(0L == scc) - else if (id < 6) - assert(3L == scc) - else - assert(id == scc) - } - } - } - - test("Count a single triangle") { - withSpark { sc => - val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = TriangleCount.run(graph) - val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => assert(count === 1) } - } - } - - test("Count two triangles") { - withSpark { sc => - val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) - val rawEdges = sc.parallelize(triangles, 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = TriangleCount.run(graph) - val verts = triangleCount.vertices - verts.collect().foreach { case (vid, count) => - if (vid == 0) { - assert(count === 2) - } else { - assert(count === 1) - } - } - } - } - - test("Count two triangles with bi-directed edges") { - withSpark { sc => - val triangles = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) - val revTriangles = triangles.map { case (a,b) => (b,a) } - val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) - val graph = Graph.fromEdgeTuples(rawEdges, true).cache() - val triangleCount = TriangleCount.run(graph) - val verts = triangleCount.vertices - verts.collect().foreach { case (vid, count) => - if (vid == 0) { - assert(count === 4) - } else { - assert(count === 2) - } - } - } - } - - test("Count a single triangle with duplicate edges") { - withSpark { sc => - val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) - val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() - val triangleCount = TriangleCount.run(graph) - val verts = triangleCount.vertices - verts.collect.foreach { case (vid, count) => assert(count === 1) } - } - } - - test("Test SVD++ with mean square error on training set") { - withSpark { sc => - val SvdppErr = 0.01 - val edges = sc.textFile("mllib/data/als/test.data").map { line => - val fields = line.split(",") - Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) - } - val graph = Svdpp.run(edges) - val err = graph.vertices.collect.map{ case (vid, vd) => - if (vid % 2 == 1) { vd.norm } else { 0.0 } - }.reduce(_ + _) / graph.triplets.collect.size - assert(err < SvdppErr) - } - } -} // end of AnalyticsSuite diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala index c055e461b7..9e9213631f 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala @@ -1,14 +1,10 @@ package org.apache.spark.graph -import scala.util.Random - -import org.scalatest.FunSuite - import org.apache.spark.SparkContext import org.apache.spark.graph.Graph._ import org.apache.spark.graph.impl.EdgePartition -import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.rdd._ +import org.scalatest.FunSuite class GraphOpsSuite extends FunSuite with LocalSparkContext { diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala new file mode 100644 index 0000000000..81a1b7337f --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala @@ -0,0 +1,83 @@ +package org.apache.spark.graph.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.graph.util.GraphGenerators +import org.apache.spark.rdd._ + + +class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache() + val ccGraph = ConnectedComponents.run(gridGraph).cache() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Reverse Grid Connected Components") { + withSpark { sc => + val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache() + val ccGraph = ConnectedComponents.run(gridGraph).cache() + val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum + assert(maxCCid === 0) + } + } // end of Grid connected components + + + test("Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val ccGraph = ConnectedComponents.run(twoChains).cache() + val vertices = ccGraph.vertices.collect() + for ( (id, cc) <- vertices ) { + if(id < 10) { assert(cc === 0) } + else { assert(cc === 10) } + } + val ccMap = vertices.toMap + for (id <- 0 until 20) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of chain connected components + + test("Reverse Chain Connected Components") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val chain2 = (10 until 20).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) } + val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache() + val ccGraph = ConnectedComponents.run(twoChains).cache() + val vertices = ccGraph.vertices.collect + for ( (id, cc) <- vertices ) { + if (id < 10) { + assert(cc === 0) + } else { + assert(cc === 10) + } + } + val ccMap = vertices.toMap + for ( id <- 0 until 20 ) { + if (id < 10) { + assert(ccMap(id) === 0) + } else { + assert(ccMap(id) === 10) + } + } + } + } // end of reverse chain connected components + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala new file mode 100644 index 0000000000..81d82a5a6b --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala @@ -0,0 +1,126 @@ +package org.apache.spark.graph.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.graph._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import org.apache.spark.graph.util.GraphGenerators + + +object GridPageRank { + def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = { + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val outDegree = Array.fill(nRows * nCols)(0) + // Convert row column address into vertex ids (row major order) + def sub2ind(r: Int, c: Int): Int = r * nCols + c + // Make the grid graph + for (r <- 0 until nRows; c <- 0 until nCols) { + val ind = sub2ind(r,c) + if (r+1 < nRows) { + outDegree(ind) += 1 + inNbrs(sub2ind(r+1,c)) += ind + } + if (c+1 < nCols) { + outDegree(ind) += 1 + inNbrs(sub2ind(r,c+1)) += ind + } + } + // compute the pagerank + var pr = Array.fill(nRows * nCols)(resetProb) + for (iter <- 0 until nIter) { + val oldPr = pr + pr = new Array[Double](nRows * nCols) + for (ind <- 0 until (nRows * nCols)) { + pr(ind) = resetProb + (1.0 - resetProb) * + inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum + } + } + (0L until (nRows * nCols)).zip(pr) + } + +} + + +class PageRankSuite extends FunSuite with LocalSparkContext { + + def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { + a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } + .map { case (id, error) => error }.sum + } + + test("Star PageRank") { + withSpark { sc => + val nVertices = 100 + val starGraph = GraphGenerators.starGraph(sc, nVertices).cache() + val resetProb = 0.15 + val errorTol = 1.0e-5 + + val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache() + val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache() + + // Static PageRank should only take 2 iterations to converge + val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) => + if (pr1 != pr2) 1 else 0 + }.map { case (vid, test) => test }.sum + assert(notMatching === 0) + + val staticErrors = staticRanks2.map { case (vid, pr) => + val correct = (vid > 0 && pr == resetProb) || + (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5) + if (!correct) 1 else 0 + } + assert(staticErrors.sum === 0) + + val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache() + assert(compareRanks(staticRanks2, dynamicRanks) < errorTol) + assert(compareRanks(staticRanks2, standaloneRanks) < errorTol) + } + } // end of test Star PageRank + + + + test("Grid PageRank") { + withSpark { sc => + val rows = 10 + val cols = 10 + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 50 + val errorTol = 1.0e-5 + val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache() + + val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache() + val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))) + + assert(compareRanks(staticRanks, referenceRanks) < errorTol) + assert(compareRanks(dynamicRanks, referenceRanks) < errorTol) + assert(compareRanks(standaloneRanks, referenceRanks) < errorTol) + } + } // end of Grid PageRank + + + test("Chain PageRank") { + withSpark { sc => + val chain1 = (0 until 9).map(x => (x, x+1) ) + val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) } + val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val resetProb = 0.15 + val tol = 0.0001 + val numIter = 10 + val errorTol = 1.0e-5 + + val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache() + val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache() + val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache() + + assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol) + } + } +} diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala new file mode 100644 index 0000000000..4afb158a68 --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala @@ -0,0 +1,57 @@ +package org.apache.spark.graph.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.graph.util.GraphGenerators +import org.apache.spark.rdd._ + + +class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext { + + test("Island Strongly Connected Components") { + withSpark { sc => + val vertices = sc.parallelize((1L to 5L).map(x => (x, -1))) + val edges = sc.parallelize(Seq.empty[Edge[Int]]) + val graph = Graph(vertices, edges) + val sccGraph = StronglyConnectedComponents.run(graph, 5) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(id == scc) + } + } + } + + test("Cycle Strongly Connected Components") { + withSpark { sc => + val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7))) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = StronglyConnectedComponents.run(graph, 20) + for ((id, scc) <- sccGraph.vertices.collect) { + assert(0L == scc) + } + } + } + + test("2 Cycle Strongly Connected Components") { + withSpark { sc => + val edges = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++ + Array(6L -> 0L, 5L -> 7L) + val rawEdges = sc.parallelize(edges) + val graph = Graph.fromEdgeTuples(rawEdges, -1) + val sccGraph = StronglyConnectedComponents.run(graph, 20) + for ((id, scc) <- sccGraph.vertices.collect) { + if (id < 3) + assert(0L == scc) + else if (id < 6) + assert(3L == scc) + else + assert(id == scc) + } + } + } + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala new file mode 100644 index 0000000000..4ea675c2dc --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala @@ -0,0 +1,29 @@ +package org.apache.spark.graph.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.graph.util.GraphGenerators +import org.apache.spark.rdd._ + + +class SvdppSuite extends FunSuite with LocalSparkContext { + + test("Test SVD++ with mean square error on training set") { + withSpark { sc => + val SvdppErr = 0.01 + val edges = sc.textFile("mllib/data/als/test.data").map { line => + val fields = line.split(",") + Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) + } + val graph = Svdpp.run(edges) + val err = graph.vertices.collect.map{ case (vid, vd) => + if (vid % 2 == 1) { vd.norm } else { 0.0 } + }.reduce(_ + _) / graph.triplets.collect.size + assert(err < SvdppErr) + } + } + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala new file mode 100644 index 0000000000..274ab11f0c --- /dev/null +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala @@ -0,0 +1,73 @@ +package org.apache.spark.graph.algorithms + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graph._ +import org.apache.spark.graph.util.GraphGenerators +import org.apache.spark.rdd._ + + +class TriangleCountSuite extends FunSuite with LocalSparkContext { + + test("Count a single triangle") { + withSpark { sc => + val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + + test("Count two triangles") { + withSpark { sc => + val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val rawEdges = sc.parallelize(triangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 2) + } else { + assert(count === 1) + } + } + } + } + + test("Count two triangles with bi-directed edges") { + withSpark { sc => + val triangles = + Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val revTriangles = triangles.map { case (a,b) => (b,a) } + val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) + val graph = Graph.fromEdgeTuples(rawEdges, true).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect().foreach { case (vid, count) => + if (vid == 0) { + assert(count === 4) + } else { + assert(count === 2) + } + } + } + } + + test("Count a single triangle with duplicate edges") { + withSpark { sc => + val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) + val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() + val triangleCount = TriangleCount.run(graph) + val verts = triangleCount.vertices + verts.collect.foreach { case (vid, count) => assert(count === 1) } + } + } + +} diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala index 2bce90120d..caedb55ea2 100644 --- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala @@ -9,9 +9,10 @@ import org.apache.spark.graph.Graph._ import org.apache.spark.graph._ import org.apache.spark.rdd._ + class EdgePartitionSuite extends FunSuite { - test("EdgePartition.sort") { + test("sort") { val edgesFrom0 = List(Edge(0, 1, 0)) val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) val sortedEdges = edgesFrom0 ++ edgesFrom1 @@ -26,7 +27,7 @@ class EdgePartitionSuite extends FunSuite { assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1) } - test("EdgePartition.innerJoin") { + test("innerJoin") { def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { val builder = new EdgePartitionBuilder[A] for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } -- cgit v1.2.3