aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-19 20:46:03 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-19 20:46:03 -0800
commit95381dfb9461af65f31cc2f18a125b3a82abbc3f (patch)
tree3054afed0c5e1e0e94667621dfafcf213816d5da /graph/src
parenta69465b1fa7250d036e1585543c225b6340e4790 (diff)
downloadspark-95381dfb9461af65f31cc2f18a125b3a82abbc3f.tar.gz
spark-95381dfb9461af65f31cc2f18a125b3a82abbc3f.tar.bz2
spark-95381dfb9461af65f31cc2f18a125b3a82abbc3f.zip
Split AnalyticsSuite into algorithms suites
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala5
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala313
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala6
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala83
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala126
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala57
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala29
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala73
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala5
9 files changed, 375 insertions, 322 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index 5618ce6272..5d2f0f4bda 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -1,12 +1,13 @@
package org.apache.spark.graph
+import org.apache.spark.Logging
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
/**
* This object implements the GraphLab gather-apply-scatter api.
*/
-object GraphLab {
+object GraphLab extends Logging {
/**
* Execute the GraphLab Gather-Apply-Scatter API
@@ -119,7 +120,7 @@ object GraphLab {
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
- println("Number active vertices: " + numActive)
+ logInfo("Number active vertices: " + numActive)
i += 1
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
deleted file mode 100644
index 77a193a9ac..0000000000
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ /dev/null
@@ -1,313 +0,0 @@
-package org.apache.spark.graph
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.graph.algorithms._
-import org.apache.spark.rdd._
-
-import org.apache.spark.graph.util.GraphGenerators
-
-
-object GridPageRank {
- def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
- val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
- val outDegree = Array.fill(nRows * nCols)(0)
- // Convert row column address into vertex ids (row major order)
- def sub2ind(r: Int, c: Int): Int = r * nCols + c
- // Make the grid graph
- for (r <- 0 until nRows; c <- 0 until nCols) {
- val ind = sub2ind(r,c)
- if (r+1 < nRows) {
- outDegree(ind) += 1
- inNbrs(sub2ind(r+1,c)) += ind
- }
- if (c+1 < nCols) {
- outDegree(ind) += 1
- inNbrs(sub2ind(r,c+1)) += ind
- }
- }
- // compute the pagerank
- var pr = Array.fill(nRows * nCols)(resetProb)
- for (iter <- 0 until nIter) {
- val oldPr = pr
- pr = new Array[Double](nRows * nCols)
- for (ind <- 0 until (nRows * nCols)) {
- pr(ind) = resetProb + (1.0 - resetProb) *
- inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
- }
- }
- (0L until (nRows * nCols)).zip(pr)
- }
-
-}
-
-
-class AnalyticsSuite extends FunSuite with LocalSparkContext {
-
- def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
- a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
- .map { case (id, error) => error }.sum
- }
-
- test("Star PageRank") {
- withSpark { sc =>
- val nVertices = 100
- val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
- val resetProb = 0.15
- val errorTol = 1.0e-5
-
- val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
- val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
-
- // Static PageRank should only take 2 iterations to converge
- val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
- if (pr1 != pr2) 1 else 0
- }.map { case (vid, test) => test }.sum
- assert(notMatching === 0)
-
- val staticErrors = staticRanks2.map { case (vid, pr) =>
- val correct = (vid > 0 && pr == resetProb) ||
- (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
- if (!correct) 1 else 0
- }
- assert(staticErrors.sum === 0)
-
- val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
- assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
- assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
- }
- } // end of test Star PageRank
-
-
-
- test("Grid PageRank") {
- withSpark { sc =>
- val rows = 10
- val cols = 10
- val resetProb = 0.15
- val tol = 0.0001
- val numIter = 50
- val errorTol = 1.0e-5
- val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
-
- val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
- val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
- val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
-
- assert(compareRanks(staticRanks, referenceRanks) < errorTol)
- assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
- assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
- }
- } // end of Grid PageRank
-
-
- test("Chain PageRank") {
- withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
- val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
- val resetProb = 0.15
- val tol = 0.0001
- val numIter = 10
- val errorTol = 1.0e-5
-
- val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
- val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
-
- assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
- assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
- }
- }
-
-
- test("Grid Connected Components") {
- withSpark { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
- val ccGraph = ConnectedComponents.run(gridGraph).cache()
- val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
- assert(maxCCid === 0)
- }
- } // end of Grid connected components
-
-
- test("Reverse Grid Connected Components") {
- withSpark { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
- val ccGraph = ConnectedComponents.run(gridGraph).cache()
- val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
- assert(maxCCid === 0)
- }
- } // end of Grid connected components
-
-
- test("Chain Connected Components") {
- withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val chain2 = (10 until 20).map(x => (x, x+1) )
- val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
- val ccGraph = ConnectedComponents.run(twoChains).cache()
- val vertices = ccGraph.vertices.collect()
- for ( (id, cc) <- vertices ) {
- if(id < 10) { assert(cc === 0) }
- else { assert(cc === 10) }
- }
- val ccMap = vertices.toMap
- for (id <- 0 until 20) {
- if (id < 10) {
- assert(ccMap(id) === 0)
- } else {
- assert(ccMap(id) === 10)
- }
- }
- }
- } // end of chain connected components
-
- test("Reverse Chain Connected Components") {
- withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val chain2 = (10 until 20).map(x => (x, x+1) )
- val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
- val ccGraph = ConnectedComponents.run(twoChains).cache()
- val vertices = ccGraph.vertices.collect
- for ( (id, cc) <- vertices ) {
- if (id < 10) {
- assert(cc === 0)
- } else {
- assert(cc === 10)
- }
- }
- val ccMap = vertices.toMap
- for ( id <- 0 until 20 ) {
- if (id < 10) {
- assert(ccMap(id) === 0)
- } else {
- assert(ccMap(id) === 10)
- }
- }
- }
- } // end of reverse chain connected components
-
- test("Island Strongly Connected Components") {
- withSpark { sc =>
- val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
- val edges = sc.parallelize(Seq.empty[Edge[Int]])
- val graph = Graph(vertices, edges)
- val sccGraph = StronglyConnectedComponents.run(graph, 5)
- for ((id, scc) <- sccGraph.vertices.collect) {
- assert(id == scc)
- }
- }
- }
-
- test("Cycle Strongly Connected Components") {
- withSpark { sc =>
- val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
- val graph = Graph.fromEdgeTuples(rawEdges, -1)
- val sccGraph = StronglyConnectedComponents.run(graph, 20)
- for ((id, scc) <- sccGraph.vertices.collect) {
- assert(0L == scc)
- }
- }
- }
-
- test("2 Cycle Strongly Connected Components") {
- withSpark { sc =>
- val edges =
- Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
- Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++
- Array(6L -> 0L, 5L -> 7L)
- val rawEdges = sc.parallelize(edges)
- val graph = Graph.fromEdgeTuples(rawEdges, -1)
- val sccGraph = StronglyConnectedComponents.run(graph, 20)
- for ((id, scc) <- sccGraph.vertices.collect) {
- if (id < 3)
- assert(0L == scc)
- else if (id < 6)
- assert(3L == scc)
- else
- assert(id == scc)
- }
- }
- }
-
- test("Count a single triangle") {
- withSpark { sc =>
- val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
- val verts = triangleCount.vertices
- verts.collect.foreach { case (vid, count) => assert(count === 1) }
- }
- }
-
- test("Count two triangles") {
- withSpark { sc =>
- val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
- Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
- val rawEdges = sc.parallelize(triangles, 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
- val verts = triangleCount.vertices
- verts.collect().foreach { case (vid, count) =>
- if (vid == 0) {
- assert(count === 2)
- } else {
- assert(count === 1)
- }
- }
- }
- }
-
- test("Count two triangles with bi-directed edges") {
- withSpark { sc =>
- val triangles =
- Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
- Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
- val revTriangles = triangles.map { case (a,b) => (b,a) }
- val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
- val verts = triangleCount.vertices
- verts.collect().foreach { case (vid, count) =>
- if (vid == 0) {
- assert(count === 4)
- } else {
- assert(count === 2)
- }
- }
- }
- }
-
- test("Count a single triangle with duplicate edges") {
- withSpark { sc =>
- val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
- Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
- val triangleCount = TriangleCount.run(graph)
- val verts = triangleCount.vertices
- verts.collect.foreach { case (vid, count) => assert(count === 1) }
- }
- }
-
- test("Test SVD++ with mean square error on training set") {
- withSpark { sc =>
- val SvdppErr = 0.01
- val edges = sc.textFile("mllib/data/als/test.data").map { line =>
- val fields = line.split(",")
- Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
- }
- val graph = Svdpp.run(edges)
- val err = graph.vertices.collect.map{ case (vid, vd) =>
- if (vid % 2 == 1) { vd.norm } else { 0.0 }
- }.reduce(_ + _) / graph.triplets.collect.size
- assert(err < SvdppErr)
- }
- }
-} // end of AnalyticsSuite
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
index c055e461b7..9e9213631f 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
@@ -1,14 +1,10 @@
package org.apache.spark.graph
-import scala.util.Random
-
-import org.scalatest.FunSuite
-
import org.apache.spark.SparkContext
import org.apache.spark.graph.Graph._
import org.apache.spark.graph.impl.EdgePartition
-import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
+import org.scalatest.FunSuite
class GraphOpsSuite extends FunSuite with LocalSparkContext {
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..81a1b7337f
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala
@@ -0,0 +1,83 @@
+package org.apache.spark.graph.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Reverse Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect()
+ for ( (id, cc) <- vertices ) {
+ if(id < 10) { assert(cc === 0) }
+ else { assert(cc === 10) }
+ }
+ val ccMap = vertices.toMap
+ for (id <- 0 until 20) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of chain connected components
+
+ test("Reverse Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect
+ for ( (id, cc) <- vertices ) {
+ if (id < 10) {
+ assert(cc === 0)
+ } else {
+ assert(cc === 10)
+ }
+ }
+ val ccMap = vertices.toMap
+ for ( id <- 0 until 20 ) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of reverse chain connected components
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala
new file mode 100644
index 0000000000..81d82a5a6b
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala
@@ -0,0 +1,126 @@
+package org.apache.spark.graph.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graph._
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import org.apache.spark.graph.util.GraphGenerators
+
+
+object GridPageRank {
+ def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
+ val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
+ val outDegree = Array.fill(nRows * nCols)(0)
+ // Convert row column address into vertex ids (row major order)
+ def sub2ind(r: Int, c: Int): Int = r * nCols + c
+ // Make the grid graph
+ for (r <- 0 until nRows; c <- 0 until nCols) {
+ val ind = sub2ind(r,c)
+ if (r+1 < nRows) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r+1,c)) += ind
+ }
+ if (c+1 < nCols) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r,c+1)) += ind
+ }
+ }
+ // compute the pagerank
+ var pr = Array.fill(nRows * nCols)(resetProb)
+ for (iter <- 0 until nIter) {
+ val oldPr = pr
+ pr = new Array[Double](nRows * nCols)
+ for (ind <- 0 until (nRows * nCols)) {
+ pr(ind) = resetProb + (1.0 - resetProb) *
+ inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
+ }
+ }
+ (0L until (nRows * nCols)).zip(pr)
+ }
+
+}
+
+
+class PageRankSuite extends FunSuite with LocalSparkContext {
+
+ def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
+ a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
+ .map { case (id, error) => error }.sum
+ }
+
+ test("Star PageRank") {
+ withSpark { sc =>
+ val nVertices = 100
+ val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
+ val resetProb = 0.15
+ val errorTol = 1.0e-5
+
+ val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
+ val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
+
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ if (pr1 != pr2) 1 else 0
+ }.map { case (vid, test) => test }.sum
+ assert(notMatching === 0)
+
+ val staticErrors = staticRanks2.map { case (vid, pr) =>
+ val correct = (vid > 0 && pr == resetProb) ||
+ (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+ if (!correct) 1 else 0
+ }
+ assert(staticErrors.sum === 0)
+
+ val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
+ assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
+ }
+ } // end of test Star PageRank
+
+
+
+ test("Grid PageRank") {
+ withSpark { sc =>
+ val rows = 10
+ val cols = 10
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 50
+ val errorTol = 1.0e-5
+ val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
+
+ val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
+ val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
+
+ assert(compareRanks(staticRanks, referenceRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
+ assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
+ }
+ } // end of Grid PageRank
+
+
+ test("Chain PageRank") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 10
+ val errorTol = 1.0e-5
+
+ val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
+ }
+ }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..4afb158a68
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala
@@ -0,0 +1,57 @@
+package org.apache.spark.graph.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Island Strongly Connected Components") {
+ withSpark { sc =>
+ val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
+ val edges = sc.parallelize(Seq.empty[Edge[Int]])
+ val graph = Graph(vertices, edges)
+ val sccGraph = StronglyConnectedComponents.run(graph, 5)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(id == scc)
+ }
+ }
+ }
+
+ test("Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(0L == scc)
+ }
+ }
+ }
+
+ test("2 Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val edges =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++
+ Array(6L -> 0L, 5L -> 7L)
+ val rawEdges = sc.parallelize(edges)
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ if (id < 3)
+ assert(0L == scc)
+ else if (id < 6)
+ assert(3L == scc)
+ else
+ assert(id == scc)
+ }
+ }
+ }
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala
new file mode 100644
index 0000000000..4ea675c2dc
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala
@@ -0,0 +1,29 @@
+package org.apache.spark.graph.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class SvdppSuite extends FunSuite with LocalSparkContext {
+
+ test("Test SVD++ with mean square error on training set") {
+ withSpark { sc =>
+ val SvdppErr = 0.01
+ val edges = sc.textFile("mllib/data/als/test.data").map { line =>
+ val fields = line.split(",")
+ Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
+ }
+ val graph = Svdpp.run(edges)
+ val err = graph.vertices.collect.map{ case (vid, vd) =>
+ if (vid % 2 == 1) { vd.norm } else { 0.0 }
+ }.reduce(_ + _) / graph.triplets.collect.size
+ assert(err < SvdppErr)
+ }
+ }
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala
new file mode 100644
index 0000000000..274ab11f0c
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala
@@ -0,0 +1,73 @@
+package org.apache.spark.graph.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class TriangleCountSuite extends FunSuite with LocalSparkContext {
+
+ test("Count a single triangle") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+ test("Count two triangles") {
+ withSpark { sc =>
+ val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val rawEdges = sc.parallelize(triangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 2)
+ } else {
+ assert(count === 1)
+ }
+ }
+ }
+ }
+
+ test("Count two triangles with bi-directed edges") {
+ withSpark { sc =>
+ val triangles =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val revTriangles = triangles.map { case (a,b) => (b,a) }
+ val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 4)
+ } else {
+ assert(count === 2)
+ }
+ }
+ }
+ }
+
+ test("Count a single triangle with duplicate edges") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
index 2bce90120d..caedb55ea2 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
@@ -9,9 +9,10 @@ import org.apache.spark.graph.Graph._
import org.apache.spark.graph._
import org.apache.spark.rdd._
+
class EdgePartitionSuite extends FunSuite {
- test("EdgePartition.sort") {
+ test("sort") {
val edgesFrom0 = List(Edge(0, 1, 0))
val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
val sortedEdges = edgesFrom0 ++ edgesFrom1
@@ -26,7 +27,7 @@ class EdgePartitionSuite extends FunSuite {
assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
}
- test("EdgePartition.innerJoin") {
+ test("innerJoin") {
def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }