aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-09 14:31:33 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-09 14:31:33 -0800
commit731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1 (patch)
tree51fa693c046869f0706337046a1581c09e56e4b5 /graphx/src/test
parent100718bcd3f6ade1a93256458ec1528bb9142b5e (diff)
downloadspark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.gz
spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.tar.bz2
spark-731f56f309914e3fc7c22c8ef1c8cb9dd40d42c1.zip
graph -> graphx
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/resources/log4j.properties28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala92
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala272
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala28
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala41
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala183
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala85
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala83
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala126
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala30
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala57
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala73
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala76
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala113
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala93
15 files changed, 1380 insertions, 0 deletions
diff --git a/graphx/src/test/resources/log4j.properties b/graphx/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..896936d8c4
--- /dev/null
+++ b/graphx/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=graph/target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
new file mode 100644
index 0000000000..cc281fce99
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -0,0 +1,92 @@
+package org.apache.spark.graphx
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.rdd._
+import org.scalatest.FunSuite
+
+class GraphOpsSuite extends FunSuite with LocalSparkContext {
+
+ test("aggregateNeighbors") {
+ withSpark { sc =>
+ val n = 3
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
+
+ val indegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.In)
+ assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
+
+ val outdegrees = star.aggregateNeighbors(
+ (vid, edge) => Some(1),
+ (a: Int, b: Int) => a + b,
+ EdgeDirection.Out)
+ assert(outdegrees.collect().toSet === Set((0, n)))
+
+ val noVertexValues = star.aggregateNeighbors[Int](
+ (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
+ (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
+ EdgeDirection.In)
+ assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
+ }
+ }
+
+ test("joinVertices") {
+ withSpark { sc =>
+ val vertices =
+ sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
+ val g: Graph[String, String] = Graph(vertices, edges)
+
+ val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
+
+ val v = g1.vertices.collect().toSet
+ assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
+ }
+ }
+
+ test("collectNeighborIds") {
+ withSpark { sc =>
+ val chain = (0 until 100).map(x => (x, (x+1)%100) )
+ val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
+ val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
+ assert(nbrs.count === chain.size)
+ assert(graph.numVertices === nbrs.count)
+ nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
+ nbrs.collect.foreach { case (vid, nbrs) =>
+ val s = nbrs.toSet
+ assert(s.contains((vid + 1) % 100))
+ assert(s.contains(if (vid > 0) vid - 1 else 99 ))
+ }
+ }
+ }
+
+ test ("filter") {
+ withSpark { sc =>
+ val n = 5
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
+ val graph: Graph[Int, Int] = Graph(vertices, edges)
+ val filteredGraph = graph.filter(
+ graph => {
+ val degrees: VertexRDD[Int] = graph.outDegrees
+ graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
+ },
+ vpred = (vid: VertexID, deg:Int) => deg > 0
+ )
+
+ val v = filteredGraph.vertices.collect().toSet
+ assert(v === Set((0,0)))
+
+ // the map is necessary because of object-reuse in the edge iterator
+ val e = filteredGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
+ assert(e.isEmpty)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
new file mode 100644
index 0000000000..094fa722a0
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -0,0 +1,272 @@
+package org.apache.spark.graphx
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.rdd._
+
+class GraphSuite extends FunSuite with LocalSparkContext {
+
+ def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ }
+
+ test("Graph.fromEdgeTuples") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L)
+ val doubleRing = ring ++ ring
+ val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
+ assert(graph.edges.count() === doubleRing.size)
+ assert(graph.edges.collect.forall(e => e.attr == 1))
+
+ // uniqueEdges option should uniquify edges and store duplicate count in edge attributes
+ val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
+ assert(uniqueGraph.edges.count() === ring.size)
+ assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
+ }
+ }
+
+ test("Graph.fromEdges") {
+ withSpark { sc =>
+ val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1) }
+ val graph = Graph.fromEdges(sc.parallelize(ring), 1.0F)
+ assert(graph.edges.count() === ring.size)
+ }
+ }
+
+ test("Graph.apply") {
+ withSpark { sc =>
+ val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
+ val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
+ val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val graph = Graph(vertices, edges, false)
+ assert( graph.edges.count() === rawEdges.size )
+ // Vertices not explicitly provided but referenced by edges should be created automatically
+ assert( graph.vertices.count() === 100)
+ graph.triplets.map { et =>
+ assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
+ assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
+ }
+ }
+ }
+
+ test("triplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
+ (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
+ }
+ }
+
+ test("partitionBy") {
+ withSpark { sc =>
+ def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+ def nonemptyParts(graph: Graph[Int, Int]) = {
+ graph.edges.partitionsRDD.mapPartitions { iter =>
+ Iterator(iter.next()._2.iterator.toList)
+ }.filter(_.nonEmpty)
+ }
+ val identicalEdges = List((0L, 1L), (0L, 1L))
+ val canonicalEdges = List((0L, 1L), (1L, 0L))
+ val sameSrcEdges = List((0L, 1L), (0L, 2L))
+
+ // The two edges start out in different partitions
+ for (edges <- List(identicalEdges, canonicalEdges, sameSrcEdges)) {
+ assert(nonemptyParts(mkGraph(edges)).count === 2)
+ }
+ // partitionBy(RandomVertexCut) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1)
+ // partitionBy(EdgePartition1D) puts same-source edges in the same partition
+ assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
+ // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
+ // the same partition
+ assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
+ // partitionBy(EdgePartition2D) puts identical edges in the same partition
+ assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
+
+ // partitionBy(EdgePartition2D) ensures that vertices need only be replicated to 2 * sqrt(p)
+ // partitions
+ val n = 100
+ val p = 100
+ val verts = 1 to n
+ val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
+ verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
+ assert(graph.edges.partitions.length === p)
+ val partitionedGraph = graph.partitionBy(EdgePartition2D)
+ assert(graph.edges.partitions.length === p)
+ val bound = 2 * math.sqrt(p)
+ // Each vertex should be replicated to at most 2 * sqrt(p) partitions
+ val partitionSets = partitionedGraph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound))
+ // This should not be true for the default hash partitioning
+ val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter =>
+ val part = iter.next()._2
+ Iterator((part.srcIds ++ part.dstIds).toSet)
+ }.collect
+ assert(verts.exists(id => partitionSetsUnpartitioned.count(_.contains(id)) > bound))
+ }
+ }
+
+ test("mapVertices") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ // mapVertices preserving type
+ val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
+ // mapVertices changing type
+ val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
+ }
+ }
+
+ test("mapEdges") {
+ withSpark { sc =>
+ val n = 3
+ val star = starGraph(sc, n)
+ val starWithEdgeAttrs = star.mapEdges(e => e.dstId)
+
+ val edges = starWithEdgeAttrs.edges.collect()
+ assert(edges.size === n)
+ assert(edges.toSet === (1 to n).map(x => Edge(0, x, x)).toSet)
+ }
+ }
+
+ test("mapTriplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
+ (1L to n).map(x => Edge(0, x, "vv")).toSet)
+ }
+ }
+
+ test("reverse") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
+ }
+ }
+
+ test("subgraph") {
+ withSpark { sc =>
+ // Create a star graph of 10 veritces.
+ val n = 10
+ val star = starGraph(sc, n)
+ // Take only vertices whose vids are even
+ val subgraph = star.subgraph(vpred = (vid, attr) => vid % 2 == 0)
+
+ // We should have 5 vertices.
+ assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
+
+ // And 4 edges.
+ assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
+ }
+ }
+
+ test("mask") {
+ withSpark { sc =>
+ val n = 5
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
+ val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
+ val graph: Graph[Int, Int] = Graph(vertices, edges)
+
+ val subgraph = graph.subgraph(
+ e => e.dstId != 4L,
+ (vid, vdata) => vid != 3L
+ ).mapVertices((vid, vdata) => -1).mapEdges(e => -1)
+
+ val projectedGraph = graph.mask(subgraph)
+
+ val v = projectedGraph.vertices.collect().toSet
+ assert(v === Set((0,0), (1,1), (2,2), (4,4), (5,5)))
+
+ // the map is necessary because of object-reuse in the edge iterator
+ val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect().toSet
+ assert(e === Set(Edge(0,1,1), Edge(0,2,2), Edge(0,5,5)))
+
+ }
+ }
+
+ test("groupEdges") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n)
+ val doubleStar = Graph.fromEdgeTuples(
+ sc.parallelize((1 to n).flatMap(x =>
+ List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
+ val star2 = doubleStar.groupEdges { (a, b) => a}
+ assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
+ test("mapReduceTriplets") {
+ withSpark { sc =>
+ val n = 5
+ val star = starGraph(sc, n).mapVertices { (_, _) => 0 }
+ val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
+ val neighborDegreeSums = starDeg.mapReduceTriplets(
+ edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
+ (a: Int, b: Int) => a + b)
+ assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
+
+ // activeSetOpt
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
+ val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
+ val vids = complete.mapVertices((vid, attr) => vid).cache()
+ val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
+ val numEvenNeighbors = vids.mapReduceTriplets(et => {
+ // Map function should only run on edges with destination in the active set
+ if (et.dstId % 2 != 0) {
+ throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
+ }
+ Iterator((et.srcId, 1))
+ }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
+
+ // outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
+ val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
+ // Map function should only run on edges with source in the active set
+ if (et.srcId % 2 != 1) {
+ throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
+ }
+ Iterator((et.dstId, 1))
+ }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
+
+ }
+ }
+
+ test("outerJoinVertices") {
+ withSpark { sc =>
+ val n = 5
+ val reverseStar = starGraph(sc, n).reverse
+ // outerJoinVertices changing type
+ val reverseStarDegrees =
+ reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+ val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
+ et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => a + b).collect.toSet
+ assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
+ // outerJoinVertices preserving type
+ val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
+ val newReverseStar =
+ reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
+ assert(newReverseStar.vertices.map(_._2).collect.toSet ===
+ (0 to n).map(x => "v%d".format(x)).toSet)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
new file mode 100644
index 0000000000..6aec2ea8a9
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.graphx
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkContext
+
+
+/**
+ * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
+ * after each test.
+*/
+trait LocalSparkContext {
+ System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+
+ /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
+ def withSpark[T](f: SparkContext => T) = {
+ val sc = new SparkContext("local", "test")
+ try {
+ f(sc)
+ } finally {
+ sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
new file mode 100644
index 0000000000..429622357f
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -0,0 +1,41 @@
+package org.apache.spark.graphx
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd._
+
+class PregelSuite extends FunSuite with LocalSparkContext {
+
+ test("1 iteration") {
+ withSpark { sc =>
+ val n = 5
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
+ val result = Pregel(star, 0)(
+ (vid, attr, msg) => attr,
+ et => Iterator.empty,
+ (a: Int, b: Int) => throw new Exception("mergeMsg run unexpectedly"))
+ assert(result.vertices.collect.toSet === star.vertices.collect.toSet)
+ }
+ }
+
+ test("chain propagation") {
+ withSpark { sc =>
+ val n = 5
+ val chain = Graph.fromEdgeTuples(
+ sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
+ 0).cache()
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
+ val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
+ assert(chainWithSeed.vertices.collect.toSet ===
+ Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
+ val result = Pregel(chainWithSeed, 0)(
+ (vid, attr, msg) => math.max(msg, attr),
+ et => Iterator((et.dstId, et.srcAttr)),
+ (a: Int, b: Int) => math.max(a, b))
+ assert(result.vertices.collect.toSet ===
+ chain.vertices.mapValues { (vid, attr) => attr + 1 }.collect.toSet)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
new file mode 100644
index 0000000000..3ba412c1f8
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -0,0 +1,183 @@
+package org.apache.spark.graphx
+
+import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark._
+import org.apache.spark.graphx.impl._
+import org.apache.spark.graphx.impl.MsgRDDFunctions._
+import org.apache.spark.serializer.SerializationStream
+
+
+class SerializerSuite extends FunSuite with LocalSparkContext {
+
+ test("IntVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new IntVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("LongVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new LongVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("DoubleVertexBroadcastMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new DoubleVertexBroadcastMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
+ val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
+ assert(outMsg.vid === inMsg1.vid)
+ assert(outMsg.vid === inMsg2.vid)
+ assert(outMsg.data === inMsg1.data)
+ assert(outMsg.data === inMsg2.data)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("IntAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 5)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Int) = inStrm.readObject()
+ val inMsg2: (VertexID, Int) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("LongAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 1L << 32)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Long) = inStrm.readObject()
+ val inMsg2: (VertexID, Long) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("DoubleAggMsgSerializer") {
+ val conf = new SparkConf(false)
+ val outMsg = (4: VertexID, 5.0)
+ val bout = new ByteArrayOutputStream
+ val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
+ outStrm.writeObject(outMsg)
+ outStrm.writeObject(outMsg)
+ bout.flush()
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
+ val inMsg1: (VertexID, Double) = inStrm.readObject()
+ val inMsg2: (VertexID, Double) = inStrm.readObject()
+ assert(outMsg === inMsg1)
+ assert(outMsg === inMsg2)
+
+ intercept[EOFException] {
+ inStrm.readObject()
+ }
+ }
+
+ test("TestShuffleVertexBroadcastMsg") {
+ withSpark { sc =>
+ val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
+ new VertexBroadcastMsg[Int](pid, pid, pid)
+ }
+ bmsgs.partitionBy(new HashPartitioner(3)).collect()
+ }
+ }
+
+ test("variable long encoding") {
+ def testVarLongEncoding(v: Long, optimizePositive: Boolean) {
+ val bout = new ByteArrayOutputStream
+ val stream = new ShuffleSerializationStream(bout) {
+ def writeObject[T](t: T): SerializationStream = {
+ writeVarLong(t.asInstanceOf[Long], optimizePositive = optimizePositive)
+ this
+ }
+ }
+ stream.writeObject(v)
+
+ val bin = new ByteArrayInputStream(bout.toByteArray)
+ val dstream = new ShuffleDeserializationStream(bin) {
+ def readObject[T](): T = {
+ readVarLong(optimizePositive).asInstanceOf[T]
+ }
+ }
+ val read = dstream.readObject[Long]()
+ assert(read === v)
+ }
+
+ // Test all variable encoding code path (each branch uses 7 bits, i.e. 1L << 7 difference)
+ val d = Random.nextLong() % 128
+ Seq[Long](0, 1L << 0 + d, 1L << 7 + d, 1L << 14 + d, 1L << 21 + d, 1L << 28 + d, 1L << 35 + d,
+ 1L << 42 + d, 1L << 49 + d, 1L << 56 + d, 1L << 63 + d).foreach { number =>
+ testVarLongEncoding(number, optimizePositive = false)
+ testVarLongEncoding(number, optimizePositive = true)
+ testVarLongEncoding(-number, optimizePositive = false)
+ testVarLongEncoding(-number, optimizePositive = true)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
new file mode 100644
index 0000000000..573b708e89
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -0,0 +1,85 @@
+package org.apache.spark.graphx
+
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.rdd._
+import org.scalatest.FunSuite
+
+class VertexRDDSuite extends FunSuite with LocalSparkContext {
+
+ def vertices(sc: SparkContext, n: Int) = {
+ VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
+ }
+
+ test("filter") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ assert(evens.count === (0 to n).filter(_ % 2 == 0).size)
+ }
+ }
+
+ test("mapValues") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val negatives = verts.mapValues(x => -x).cache() // Allow joining b with a derived RDD of b
+ assert(negatives.count === n + 1)
+ }
+ }
+
+ test("diff") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val flipEvens = verts.mapValues(x => if (x % 2 == 0) -x else x)
+ // diff should keep only the changed vertices
+ assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
+ // diff should keep the vertex values from `other`
+ assert(flipEvens.diff(verts).map(_._2).collect().toSet === (2 to n by 2).toSet)
+ }
+ }
+
+ test("leftJoin") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ // leftJoin with another VertexRDD
+ assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
+ // leftJoin with an RDD
+ val evensRDD = evens.map(identity)
+ assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
+ }
+ }
+
+ test("innerJoin") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val evens = verts.filter(q => ((q._2 % 2) == 0))
+ // innerJoin with another VertexRDD
+ assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet)
+ // innerJoin with an RDD
+ val evensRDD = evens.map(identity)
+ assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
+ (0 to n by 2).map(x => (x.toLong, 0)).toSet) }
+ }
+
+ test("aggregateUsingIndex") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n)
+ val messageTargets = (0 to n) ++ (0 to n by 2)
+ val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
+ assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
+ (0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..5e2ecfcde9
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
@@ -0,0 +1,83 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Reverse Grid Connected Components") {
+ withSpark { sc =>
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
+ val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ assert(maxCCid === 0)
+ }
+ } // end of Grid connected components
+
+
+ test("Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect()
+ for ( (id, cc) <- vertices ) {
+ if(id < 10) { assert(cc === 0) }
+ else { assert(cc === 10) }
+ }
+ val ccMap = vertices.toMap
+ for (id <- 0 until 20) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of chain connected components
+
+ test("Reverse Chain Connected Components") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val chain2 = (10 until 20).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
+ val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
+ val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val vertices = ccGraph.vertices.collect
+ for ( (id, cc) <- vertices ) {
+ if (id < 10) {
+ assert(cc === 0)
+ } else {
+ assert(cc === 10)
+ }
+ }
+ val ccMap = vertices.toMap
+ for ( id <- 0 until 20 ) {
+ if (id < 10) {
+ assert(ccMap(id) === 0)
+ } else {
+ assert(ccMap(id) === 10)
+ }
+ }
+ }
+ } // end of reverse chain connected components
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
new file mode 100644
index 0000000000..e365b1e230
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
@@ -0,0 +1,126 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import org.apache.spark.graphx.util.GraphGenerators
+
+
+object GridPageRank {
+ def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
+ val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
+ val outDegree = Array.fill(nRows * nCols)(0)
+ // Convert row column address into vertex ids (row major order)
+ def sub2ind(r: Int, c: Int): Int = r * nCols + c
+ // Make the grid graph
+ for (r <- 0 until nRows; c <- 0 until nCols) {
+ val ind = sub2ind(r,c)
+ if (r+1 < nRows) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r+1,c)) += ind
+ }
+ if (c+1 < nCols) {
+ outDegree(ind) += 1
+ inNbrs(sub2ind(r,c+1)) += ind
+ }
+ }
+ // compute the pagerank
+ var pr = Array.fill(nRows * nCols)(resetProb)
+ for (iter <- 0 until nIter) {
+ val oldPr = pr
+ pr = new Array[Double](nRows * nCols)
+ for (ind <- 0 until (nRows * nCols)) {
+ pr(ind) = resetProb + (1.0 - resetProb) *
+ inNbrs(ind).map( nbr => oldPr(nbr) / outDegree(nbr)).sum
+ }
+ }
+ (0L until (nRows * nCols)).zip(pr)
+ }
+
+}
+
+
+class PageRankSuite extends FunSuite with LocalSparkContext {
+
+ def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
+ a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
+ .map { case (id, error) => error }.sum
+ }
+
+ test("Star PageRank") {
+ withSpark { sc =>
+ val nVertices = 100
+ val starGraph = GraphGenerators.starGraph(sc, nVertices).cache()
+ val resetProb = 0.15
+ val errorTol = 1.0e-5
+
+ val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
+ val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
+
+ // Static PageRank should only take 2 iterations to converge
+ val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
+ if (pr1 != pr2) 1 else 0
+ }.map { case (vid, test) => test }.sum
+ assert(notMatching === 0)
+
+ val staticErrors = staticRanks2.map { case (vid, pr) =>
+ val correct = (vid > 0 && pr == resetProb) ||
+ (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+ if (!correct) 1 else 0
+ }
+ assert(staticErrors.sum === 0)
+
+ val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
+ assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
+ assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
+ }
+ } // end of test Star PageRank
+
+
+
+ test("Grid PageRank") {
+ withSpark { sc =>
+ val rows = 10
+ val cols = 10
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 50
+ val errorTol = 1.0e-5
+ val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
+
+ val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
+ val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
+
+ assert(compareRanks(staticRanks, referenceRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
+ assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
+ }
+ } // end of Grid PageRank
+
+
+ test("Chain PageRank") {
+ withSpark { sc =>
+ val chain1 = (0 until 9).map(x => (x, x+1) )
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val resetProb = 0.15
+ val tol = 0.0001
+ val numIter = 10
+ val errorTol = 1.0e-5
+
+ val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
+ val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
+ val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
+
+ assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+ assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
+ }
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
new file mode 100644
index 0000000000..06604198d7
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
+
+ test("Test SVD++ with mean square error on training set") {
+ withSpark { sc =>
+ val svdppErr = 8.0
+ val edges = sc.textFile("mllib/data/als/test.data").map { line =>
+ val fields = line.split(",")
+ Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
+ }
+ val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
+ var (graph, u) = SVDPlusPlus.run(edges, conf)
+ val err = graph.vertices.collect.map{ case (vid, vd) =>
+ if (vid % 2 == 1) vd._4 else 0.0
+ }.reduce(_ + _) / graph.triplets.collect.size
+ assert(err <= svdppErr)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
new file mode 100644
index 0000000000..696b80944e
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
@@ -0,0 +1,57 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
+
+ test("Island Strongly Connected Components") {
+ withSpark { sc =>
+ val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
+ val edges = sc.parallelize(Seq.empty[Edge[Int]])
+ val graph = Graph(vertices, edges)
+ val sccGraph = StronglyConnectedComponents.run(graph, 5)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(id == scc)
+ }
+ }
+ }
+
+ test("Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ assert(0L == scc)
+ }
+ }
+ }
+
+ test("2 Cycle Strongly Connected Components") {
+ withSpark { sc =>
+ val edges =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(3L -> 4L, 4L -> 5L, 5L -> 3L) ++
+ Array(6L -> 0L, 5L -> 7L)
+ val rawEdges = sc.parallelize(edges)
+ val graph = Graph.fromEdgeTuples(rawEdges, -1)
+ val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ for ((id, scc) <- sccGraph.vertices.collect) {
+ if (id < 3)
+ assert(0L == scc)
+ else if (id < 6)
+ assert(3L == scc)
+ else
+ assert(id == scc)
+ }
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
new file mode 100644
index 0000000000..0e59912754
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
@@ -0,0 +1,73 @@
+package org.apache.spark.graphx.algorithms
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
+import org.apache.spark.rdd._
+
+
+class TriangleCountSuite extends FunSuite with LocalSparkContext {
+
+ test("Count a single triangle") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+ test("Count two triangles") {
+ withSpark { sc =>
+ val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val rawEdges = sc.parallelize(triangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 2)
+ } else {
+ assert(count === 1)
+ }
+ }
+ }
+ }
+
+ test("Count two triangles with bi-directed edges") {
+ withSpark { sc =>
+ val triangles =
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
+ val revTriangles = triangles.map { case (a,b) => (b,a) }
+ val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect().foreach { case (vid, count) =>
+ if (vid == 0) {
+ assert(count === 4)
+ } else {
+ assert(count === 2)
+ }
+ }
+ }
+ }
+
+ test("Count a single triangle with duplicate edges") {
+ withSpark { sc =>
+ val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
+ Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
+ val triangleCount = TriangleCount.run(graph)
+ val verts = triangleCount.vertices
+ verts.collect.foreach { case (vid, count) => assert(count === 1) }
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
new file mode 100644
index 0000000000..eb82436f09
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -0,0 +1,76 @@
+package org.apache.spark.graphx.impl
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+
+class EdgePartitionSuite extends FunSuite {
+
+ test("reverse") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.reverse.iterator.map(_.copy()).toList === reversedEdges)
+ assert(edgePartition.reverse.reverse.iterator.map(_.copy()).toList === edges)
+ }
+
+ test("map") {
+ val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.map(e => e.srcId + e.dstId).iterator.map(_.copy()).toList ===
+ edges.map(e => e.copy(attr = e.srcId + e.dstId)))
+ }
+
+ test("groupEdges") {
+ val edges = List(
+ Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32))
+ val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36))
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- edges) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges)
+ }
+
+ test("indexIterator") {
+ val edgesFrom0 = List(Edge(0, 1, 0))
+ val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0))
+ val sortedEdges = edgesFrom0 ++ edgesFrom1
+ val builder = new EdgePartitionBuilder[Int]
+ for (e <- Random.shuffle(sortedEdges)) {
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+
+ val edgePartition = builder.toEdgePartition
+ assert(edgePartition.iterator.map(_.copy()).toList === sortedEdges)
+ assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
+ assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
+ }
+
+ test("innerJoin") {
+ def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
+ val builder = new EdgePartitionBuilder[A]
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
+ builder.toEdgePartition
+ }
+ val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
+ val a = makeEdgePartition(aList)
+ val b = makeEdgePartition(bList)
+
+ assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
+ List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
new file mode 100644
index 0000000000..d37d64e8c8
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -0,0 +1,113 @@
+package org.apache.spark.graphx.impl
+
+import org.apache.spark.graphx._
+import org.scalatest.FunSuite
+
+class VertexPartitionSuite extends FunSuite {
+
+ test("isDefined, filter") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).filter { (vid, attr) => vid == 0 }
+ assert(vp.isDefined(0))
+ assert(!vp.isDefined(1))
+ assert(!vp.isDefined(2))
+ assert(!vp.isDefined(-1))
+ }
+
+ test("isActive, numActives, replaceActives") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1)))
+ .filter { (vid, attr) => vid == 0 }
+ .replaceActives(Iterator(0, 2, 0))
+ assert(vp.isActive(0))
+ assert(!vp.isActive(1))
+ assert(vp.isActive(2))
+ assert(!vp.isActive(-1))
+ assert(vp.numActives == Some(2))
+ }
+
+ test("map") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 }
+ assert(vp(0) === 2)
+ }
+
+ test("diff") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2 = vp.filter { (vid, attr) => vid <= 1 }
+ val vp3a = vp.map { (vid, attr) => 2 }
+ val vp3b = VertexPartition(vp3a.iterator)
+ // diff with same index
+ val diff1 = vp2.diff(vp3a)
+ assert(diff1(0) === 2)
+ assert(diff1(1) === 2)
+ assert(diff1(2) === 2)
+ assert(!diff1.isDefined(2))
+ // diff with different indexes
+ val diff2 = vp2.diff(vp3b)
+ assert(diff2(0) === 2)
+ assert(diff2(1) === 2)
+ assert(diff2(2) === 2)
+ assert(!diff2.isDefined(2))
+ }
+
+ test("leftJoin") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 }
+ val vp2b = VertexPartition(vp2a.iterator)
+ // leftJoin with same index
+ val join1 = vp.leftJoin(vp2a) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join1.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ // leftJoin with different indexes
+ val join2 = vp.leftJoin(vp2b) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join2.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ // leftJoin an iterator
+ val join3 = vp.leftJoin(vp2a.iterator) { (vid, a, bOpt) => bOpt.getOrElse(a) }
+ assert(join3.iterator.toSet === Set((0L, 2), (1L, 2), (2L, 1)))
+ }
+
+ test("innerJoin") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2a = vp.filter { (vid, attr) => vid <= 1 }.map { (vid, attr) => 2 }
+ val vp2b = VertexPartition(vp2a.iterator)
+ // innerJoin with same index
+ val join1 = vp.innerJoin(vp2a) { (vid, a, b) => b }
+ assert(join1.iterator.toSet === Set((0L, 2), (1L, 2)))
+ // innerJoin with different indexes
+ val join2 = vp.innerJoin(vp2b) { (vid, a, b) => b }
+ assert(join2.iterator.toSet === Set((0L, 2), (1L, 2)))
+ // innerJoin an iterator
+ val join3 = vp.innerJoin(vp2a.iterator) { (vid, a, b) => b }
+ assert(join3.iterator.toSet === Set((0L, 2), (1L, 2)))
+ }
+
+ test("createUsingIndex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val elems = List((0L, 2), (2L, 2), (3L, 2))
+ val vp2 = vp.createUsingIndex(elems.iterator)
+ assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2)))
+ assert(vp.index === vp2.index)
+ }
+
+ test("innerJoinKeepLeft") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val elems = List((0L, 2), (2L, 2), (3L, 2))
+ val vp2 = vp.innerJoinKeepLeft(elems.iterator)
+ assert(vp2.iterator.toSet === Set((0L, 2), (2L, 2)))
+ assert(vp2(1) === 1)
+ }
+
+ test("aggregateUsingIndex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val messages = List((0L, "a"), (2L, "b"), (0L, "c"), (3L, "d"))
+ val vp2 = vp.aggregateUsingIndex[String](messages.iterator, _ + _)
+ assert(vp2.iterator.toSet === Set((0L, "ac"), (2L, "b")))
+ }
+
+ test("reindex") {
+ val vp = VertexPartition(Iterator((0L, 1), (1L, 1), (2L, 1)))
+ val vp2 = vp.filter { (vid, attr) => vid <= 1 }
+ val vp3 = vp2.reindex()
+ assert(vp2.iterator.toSet === vp3.iterator.toSet)
+ assert(vp2(2) === 1)
+ assert(vp3.index.getPos(2) === -1)
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
new file mode 100644
index 0000000000..11db339750
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
@@ -0,0 +1,93 @@
+package org.apache.spark.graphx.util
+
+import org.scalatest.FunSuite
+
+
+class BytecodeUtilsSuite extends FunSuite {
+
+ import BytecodeUtilsSuite.TestClass
+
+ test("closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+
+ val c2 = {e: TestClass => println(e.foo); println(e.bar); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+
+ val c3 = {e: TestClass => println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz"))
+ }
+
+ test("closure inside a closure inside a closure invokes a method") {
+ val c1 = {e: TestClass => println(e.baz); }
+ val c2 = {e: TestClass => c1(e); println(e.foo); }
+ val c3 = {e: TestClass => c2(e) }
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method") {
+ def zoo(e: TestClass) {
+ println(e.baz)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("closure calling a function that invokes a method which uses another closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass) {
+ c2(e)
+ }
+ val c1 = {e: TestClass => zoo(e)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ test("nested closure") {
+ val c2 = {e: TestClass => println(e.baz)}
+ def zoo(e: TestClass, c: TestClass => Unit) {
+ c(e)
+ }
+ val c1 = {e: TestClass => zoo(e, c2)}
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+ assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+ assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+ }
+
+ // The following doesn't work yet, because the byte code doesn't contain any information
+ // about what exactly "c" is.
+// test("invoke interface") {
+// val c1 = {e: TestClass => c(e)}
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo"))
+// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar"))
+// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz"))
+// }
+
+ private val c = {e: TestClass => println(e.baz)}
+}
+
+
+object BytecodeUtilsSuite {
+ class TestClass(val foo: Int, val bar: Long) {
+ def baz: Boolean = false
+ }
+}