aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-19 19:49:07 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-19 19:49:07 -0800
commitda9f5e3fc093a91e0e91bc9311d5f5d085dbc929 (patch)
treeff689f2434e026210cffd9d6306d4c1aad7d061b /graph/src
parentda301b57fc7f606e2b8fd0acaf95aa1bd9b643b0 (diff)
downloadspark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.tar.gz
spark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.tar.bz2
spark-da9f5e3fc093a91e0e91bc9311d5f5d085dbc929.zip
Split GraphSuite; simplify LocalSparkContext
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala39
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala94
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala342
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala44
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala8
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala6
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala32
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala43
8 files changed, 344 insertions, 264 deletions
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index d36339b65b..1e6d8ec7cf 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -7,8 +7,6 @@ import org.apache.spark.SparkContext._
import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd._
-import org.apache.spark.graph.LocalSparkContext._
-
import org.apache.spark.graph.util.GraphGenerators
@@ -48,16 +46,13 @@ object GridPageRank {
class AnalyticsSuite extends FunSuite with LocalSparkContext {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
-
def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
.map { case (id, error) => error }.sum
}
test("Star PageRank") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val nVertices = 100
val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
val resetProb = 0.15
@@ -89,7 +84,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Grid PageRank") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val rows = 10
val cols = 10
val resetProb = 0.15
@@ -111,7 +106,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Chain PageRank") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
@@ -131,7 +126,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Grid Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
val ccGraph = ConnectedComponents.run(gridGraph).cache()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
@@ -141,7 +136,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Reverse Grid Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
val ccGraph = ConnectedComponents.run(gridGraph).cache()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
@@ -151,7 +146,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
test("Chain Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
@@ -174,7 +169,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
} // end of chain connected components
test("Reverse Chain Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
@@ -200,7 +195,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
} // end of reverse chain connected components
test("Island Strongly Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
val edges = sc.parallelize(Seq.empty[Edge[Int]])
val graph = Graph(vertices, edges)
@@ -212,7 +207,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Cycle Strongly Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
val graph = Graph.fromEdgeTuples(rawEdges, -1)
val sccGraph = StronglyConnectedComponents.run(graph, 20)
@@ -223,7 +218,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("2 Cycle Strongly Connected Components") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val edges =
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++
@@ -243,7 +238,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Count a single triangle") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = TriangleCount.run(graph)
@@ -253,7 +248,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Count two triangles") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
@@ -271,7 +266,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Count two triangles with bi-directed edges") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val triangles =
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
@@ -291,7 +286,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Count a single triangle with duplicate edges") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
@@ -302,14 +297,14 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
}
test("Test SVD++ with mean square error on training set") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val SvdppErr = 0.01
- val edges = sc.textFile("mllib/data/als/test.data").map { line =>
+ val edges = sc.textFile("mllib/data/als/test.data").map { line =>
val fields = line.split(",")
Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
}
val graph = Svdpp.run(edges)
- val err = graph.vertices.collect.map{ case (vid, vd) =>
+ val err = graph.vertices.collect.map{ case (vid, vd) =>
if (vid % 2 == 1) { vd.norm } else { 0.0 }
}.reduce(_ + _) / graph.triplets.collect.size
assert(err < SvdppErr)
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
new file mode 100644
index 0000000000..c055e461b7
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
@@ -0,0 +1,94 @@
+package org.apache.spark.graph
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graph.Graph._
+import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.rdd._
+
+class GraphOpsSuite extends FunSuite with LocalSparkContext {
+
+ test("aggregateNeighbors") {
+ withSpark { sc =>
+ val n = 3
+ val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
+
+ val indegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.In)
+ assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
+
+ val outdegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.Out)
+ assert(outdegrees.collect().toSet === Set((0, n)))
+
+ val noVertexValues = star.aggregateNeighbors[Int](
+ (vid: Vid, edge: EdgeTriplet[Int, Int]) => None,
+ (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
+ EdgeDirection.In)
+ assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)])
+ }
+ }
+
+ test("joinVertices") {
+ withSpark { sc =>
+ val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
+ val g: Graph[String, String] = Graph(vertices, edges)
+
+ val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u }
+
+ val v = g1.vertices.collect().toSet
+ assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
+ }
+ }
+
+ test("collectNeighborIds") {
+ withSpark { sc =>
+ val chain = (0 until 100).map(x => (x, (x+1)%100) )
+ val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
+ val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
+ assert(nbrs.count === chain.size)
+ assert(graph.numVertices === nbrs.count)
+ nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
+ nbrs.collect.foreach { case (vid, nbrs) =>
+ val s = nbrs.toSet
+ assert(s.contains((vid + 1) % 100))
+ assert(s.contains(if (vid > 0) vid - 1 else 99 ))
+ }
+ }
+ }
+
+ test ("filter") {
+ withSpark { sc =>
+ val n = 5
+ val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
+ val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
+ val graph: Graph[Int, Int] = Graph(vertices, edges)
+ val filteredGraph = graph.filter(
+ graph => {
+ val degrees: VertexRDD[Int] = graph.outDegrees
+ graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
+ },
+ vpred = (vid: Vid, deg:Int) => deg > 0
+ )
+
+ val v = filteredGraph.vertices.collect().toSet
+ assert(v === Set((0,0)))
+
+ // the map is necessary because of object-reuse in the edge iterator
+ val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
+ assert(e.isEmpty)
+ }
+ }
+
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index f6bb201a83..09da102350 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -6,32 +6,47 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.Graph._
-import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+ def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ }
- test("Graph Creation") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val rawEdges = (0L to 100L).zip((1L to 99L) :+ 0L)
- val edges = sc.parallelize(rawEdges)
- val graph = Graph.fromEdgeTuples(edges, 1.0F)
- assert(graph.edges.count() === rawEdges.size)
+ test("Graph.fromEdgeTuples") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L)
+ val doubleRing = ring ++ ring
+ val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
+ assert(graph.edges.count() === doubleRing.size)
+ assert(graph.edges.collect.forall(e => e.attr == 1))
+
+ // uniqueEdges option should uniquify edges and store duplicate count in edge attributes
+ val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
+ assert(uniqueGraph.edges.count() === ring.size)
+ assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
}
}
- test("Graph Creation with invalid vertices") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ test("Graph.fromEdges") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1) }
+ val graph = Graph.fromEdges(sc.parallelize(ring), 1.0F)
+ assert(graph.edges.count() === ring.size)
+ }
+ }
+
+ test("Graph.apply") {
+ withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size )
+ // Vertices not explicitly provided but referenced by edges should be created automatically
assert( graph.vertices.count() === 100)
graph.triplets.map { et =>
assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
@@ -40,153 +55,97 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
- test("core operations") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ test("triplets") {
+ withSpark { sc =>
val n = 5
- val star = Graph.fromEdgeTuples(
- sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
- // triplets
+ val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
(1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet)
- // reverse
- val reverseStar = star.reverse
- assert(reverseStar.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
- // outerJoinVertices
- val reverseStarDegrees =
- reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
- val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
- et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
- (a: Int, b: Int) => a + b).collect.toSet
- assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
+ }
+ }
+
+ test("partitionBy") {
+ withSpark { sc =>
+ def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+ def nonemptyParts(graph: Graph[Int, Int]) = {
+ graph.edges.partitionsRDD.mapPartitions { iter =>
+ Iterator(iter.next()._2.iterator.toList)
+ }.filter(_.nonEmpty)
+ }
+ val identicalEdges = List((0L, 1L), (0L, 1L))
+ val canonicalEdges = List((0L, 1L), (1L, 0L))
+ val sameSrcEdges = List((0L, 1L), (0L, 2L))
+
+ // The two edges start out in different partitions
+ for (edges <- List(identicalEdges, canonicalEdges, sameSrcEdges)) {
+ assert(nonemptyParts(mkGraph(edges)).count === 2)
+ }
+ // partitionBy(RandomVertexCut) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1)
+ // partitionBy(EdgePartition1D) puts same-source edges in the same partition
+ assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
+ // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
+ // the same partition
+ assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
+ // TODO(ankurdave): Test EdgePartition2D by checking the 2 * sqrt(p) bound on vertex
+ // replication
+ }
+ }
+
+ test("mapVertices") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
// mapVertices preserving type
- val mappedVAttrs = reverseStar.mapVertices((vid, attr) => attr + "2")
+ val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet)
// mapVertices changing type
- val mappedVAttrs2 = reverseStar.mapVertices((vid, attr) => attr.length)
+ val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
- // groupEdges
- val doubleStar = Graph.fromEdgeTuples(
- sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
- val star2 = doubleStar.groupEdges { (a, b) => a}
- assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
- star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
- assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
}
}
test("mapEdges") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val n = 3
- val star = Graph.fromEdgeTuples(
- sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v")
+ val star = starGraph(sc, n)
val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
- // map(_.copy()) is a workaround for https://github.com/amplab/graphx/issues/25
- val edges = starWithEdgeAttrs.edges.map(_.copy()).collect()
+ val edges = starWithEdgeAttrs.edges.collect()
assert(edges.size === n)
assert(edges.toSet === (1 to n).map(x => Edge(0, x, x)).toSet)
}
}
- test("mapReduceTriplets") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val n = 5
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 0)
- val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
- val neighborDegreeSums = starDeg.mapReduceTriplets(
- edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
- (a: Int, b: Int) => a + b)
- assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
-
- // activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
- val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
- val vids = complete.mapVertices((vid, attr) => vid).cache()
- val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
- val numEvenNeighbors = vids.mapReduceTriplets(et => {
- // Map function should only run on edges with destination in the active set
- if (et.dstId % 2 != 0) {
- throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
- }
- Iterator((et.srcId, 1))
- }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
-
- // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
- .mapVertices((vid, attr) => vid).cache()
- val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
- val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
- val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
- // Map function should only run on edges with source in the active set
- if (et.srcId % 2 != 1) {
- throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
- }
- Iterator((et.dstId, 1))
- }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
-
- }
+ test("mapTriplets") {
}
- test("aggregateNeighbors") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val n = 3
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
-
- val indegrees = star.aggregateNeighbors(
- (vid, edge) => Some(1),
- (a: Int, b: Int) => a + b,
- EdgeDirection.In)
- assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
-
- val outdegrees = star.aggregateNeighbors(
- (vid, edge) => Some(1),
- (a: Int, b: Int) => a + b,
- EdgeDirection.Out)
- assert(outdegrees.collect().toSet === Set((0, n)))
-
- val noVertexValues = star.aggregateNeighbors[Int](
- (vid: Vid, edge: EdgeTriplet[Int, Int]) => None,
- (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
- EdgeDirection.In)
- assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)])
+ test("reverse") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
}
}
- test("joinVertices") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)
- val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
- val g: Graph[String, String] = Graph(vertices, edges)
-
- val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20)))
- val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u }
+ test("subgraph") {
+ withSpark { sc =>
+ // Create a star graph of 10 veritces.
+ val n = 10
+ val star = starGraph(sc, n)
+ // Take only vertices whose vids are even
+ val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
- val v = g1.vertices.collect().toSet
- assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
- }
- }
+ // We should have 5 vertices.
+ assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
- test("collectNeighborIds") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val chain = (0 until 100).map(x => (x, (x+1)%100) )
- val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
- val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
- assert(nbrs.count === chain.size)
- assert(graph.numVertices === nbrs.count)
- nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
- nbrs.collect.foreach { case (vid, nbrs) =>
- val s = nbrs.toSet
- assert(s.contains((vid + 1) % 100))
- assert(s.contains(if (vid > 0) vid - 1 else 99 ))
- }
+ // And 4 edges.
+ assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
test("mask") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val n = 5
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
@@ -209,90 +168,71 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
- test ("filter") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ test("groupEdges") {
+ withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
- val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
- val graph: Graph[Int, Int] = Graph(vertices, edges)
- val filteredGraph = graph.filter(
- graph => {
- val degrees: VertexRDD[Int] = graph.outDegrees
- graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
- },
- vpred = (vid: Vid, deg:Int) => deg > 0
- )
-
- val v = filteredGraph.vertices.collect().toSet
- assert(v === Set((0,0)))
-
- // the map is necessary because of object-reuse in the edge iterator
- val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
- assert(e.isEmpty)
+ val star = starGraph(sc, n)
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
}
}
- test("VertexSetRDD") {
- withSpark(new SparkContext("local", "test")) { sc =>
- val n = 100
- val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5)
- val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
- assert(b.count === n + 1)
- assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
- val c = b.aggregateUsingIndex[Long](a, (x, y) => x)
- assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
- val d = c.filter(q => ((q._2 % 2) == 0))
- val e = a.filter(q => ((q._2 % 2) == 0))
- assert(d.count === e.count)
- assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
- val f = b.mapValues(x => if (x % 2 == 0) -x else x)
- assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet)
- }
- }
+ test("mapReduceTriplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n).mapVertices { (_, _) => 0 }
+ val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
+ val neighborDegreeSums = starDeg.mapReduceTriplets(
+ edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
+ (a: Int, b: Int) => a + b)
+ assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
- test("subgraph") {
- withSpark(new SparkContext("local", "test")) { sc =>
- // Create a star graph of 10 veritces.
- val n = 10
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), "v")
- // Take only vertices whose vids are even
- val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
+ // activeSetOpt
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
+ val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
+ val vids = complete.mapVertices((vid, attr) => vid).cache()
+ val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
+ val numEvenNeighbors = vids.mapReduceTriplets(et => {
+ // Map function should only run on edges with destination in the active set
+ if (et.dstId % 2 != 0) {
+ throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
+ }
+ Iterator((et.srcId, 1))
+ }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+ assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
- // We should have 5 vertices.
- assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
+ // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
+ val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
+ .mapVertices((vid, attr) => vid).cache()
+ val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
+ // Map function should only run on edges with source in the active set
+ if (et.srcId % 2 != 1) {
+ throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
+ }
+ Iterator((et.dstId, 1))
+ }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
- // And 4 edges.
- assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
- test("EdgePartition.sort") {
- val edgesFrom0 = List(Edge(0, 1, 0))
- val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
- val sortedEdges = edgesFrom0 ++ edgesFrom1
- val builder = new EdgePartitionBuilder[Int]
- for (e <- Random.shuffle(sortedEdges)) {
- builder.add(e.srcId, e.dstId, e.attr)
+ test("outerJoinVertices") {
+ withSpark { sc =>
+ val n = 5
+ val reverseStar = starGraph(sc, n).reverse
+ val reverseStarDegrees =
+ reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+ val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => a + b).collect.toSet
+ assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
}
-
- val edgePartition = builder.toEdgePartition
- assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
- assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
- assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
}
- test("EdgePartition.innerJoin") {
- def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
- val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
- builder.toEdgePartition
- }
- val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
- val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
- val a = makeEdgePartition(aList)
- val b = makeEdgePartition(bList)
-
- assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
- List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
- }
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
index 4a0155b6bd..5c20d559aa 100644
--- a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
@@ -6,39 +6,23 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkContext
-/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
-trait LocalSparkContext extends BeforeAndAfterEach { self: Suite =>
-
- @transient var sc: SparkContext = _
-
- override def afterEach() {
- resetSparkContext()
- super.afterEach()
- }
-
- def resetSparkContext() = {
- if (sc != null) {
- LocalSparkContext.stop(sc)
- sc = null
- }
- }
-
-}
-
-object LocalSparkContext {
- def stop(sc: SparkContext) {
- sc.stop()
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- }
-
- /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
- def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+/**
+ * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
+ * after each test.
+*/
+trait LocalSparkContext {
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+
+ /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
+ def withSpark[T](f: SparkContext => T) = {
+ val sc = new SparkContext("local", "test")
try {
f(sc)
} finally {
- stop(sc)
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
}
}
-
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
index 0897d9783e..44182e85ee 100644
--- a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
@@ -3,16 +3,12 @@ package org.apache.spark.graph
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
-import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.rdd._
class PregelSuite extends FunSuite with LocalSparkContext {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
-
test("1 iteration") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val n = 5
val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
val result = Pregel(star, 0)(
@@ -24,7 +20,7 @@ class PregelSuite extends FunSuite with LocalSparkContext {
}
test("chain propagation") {
- withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3),
diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
index 6b86f9b25d..80075f3437 100644
--- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
@@ -7,7 +7,6 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark._
-import org.apache.spark.graph.LocalSparkContext._
import org.apache.spark.graph.impl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark.serializer.SerializationStream
@@ -15,9 +14,6 @@ import org.apache.spark.serializer.SerializationStream
class SerializerSuite extends FunSuite with LocalSparkContext {
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
-
test("IntVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
@@ -139,7 +135,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
test("TestShuffleVertexBroadcastMsg") {
- withSpark(new SparkContext("local[2]", "test")) { sc =>
+ withSpark { sc =>
val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
new VertexBroadcastMsg[Int](pid, pid, pid)
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala b/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala
new file mode 100644
index 0000000000..316968bbf0
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala
@@ -0,0 +1,32 @@
+package org.apache.spark.graph
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graph.Graph._
+import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.rdd._
+
+class VertexRDDSuite extends FunSuite with LocalSparkContext {
+
+ test("VertexRDD") {
+ withSpark { sc =>
+ val n = 100
+ val a = sc.parallelize((0 to n).map(x => (x.toLong, x.toLong)), 5)
+ val b = VertexRDD(a).mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
+ assert(b.count === n + 1)
+ assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
+ val c = b.aggregateUsingIndex[Long](a, (x, y) => x)
+ assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
+ val d = c.filter(q => ((q._2 % 2) == 0))
+ val e = a.filter(q => ((q._2 % 2) == 0))
+ assert(d.count === e.count)
+ assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
+ val f = b.mapValues(x => if (x % 2 == 0) -x else x)
+ assert(b.diff(f).collect().toSet === (2 to n by 2).map(x => (x.toLong, x.toLong)).toSet)
+ }
+ }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
new file mode 100644
index 0000000000..2bce90120d
--- /dev/null
+++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
@@ -0,0 +1,43 @@
+package org.apache.spark.graph.impl
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graph.Graph._
+import org.apache.spark.graph._
+import org.apache.spark.rdd._
+
+class EdgePartitionSuite extends FunSuite {
+
+ test("EdgePartition.sort") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
+
+ test("EdgePartition.innerJoin") {
+ def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
+ val builder = new EdgePartitionBuilder[A]
+ for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
+ builder.toEdgePartition
+ }
+ val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
+ val a = makeEdgePartition(aList)
+ val b = makeEdgePartition(bList)
+
+ assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
+ List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
+ }
+}