aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-08 11:31:48 -0700
committerReynold Xin <rxin@databricks.com>2015-04-08 11:31:48 -0700
commit8d812f9986f2edf420a18ca822711c9765f480e2 (patch)
tree4a1ddb4c50fde7fb398429ce07471c34b9a95bf0 /graphx
parent9d44ddce1d1e19011026605549c37d0db6d6afa1 (diff)
downloadspark-8d812f9986f2edf420a18ca822711c9765f480e2.tar.gz
spark-8d812f9986f2edf420a18ca822711c9765f480e2.tar.bz2
spark-8d812f9986f2edf420a18ca822711c9765f480e2.zip
[SPARK-6765] Fix test code style for graphx.
So we can turn style checker on for test code. Author: Reynold Xin <rxin@databricks.com> Closes #5410 from rxin/test-style-graphx and squashes the following commits: 89e253a [Reynold Xin] [SPARK-6765] Fix test code style for graphx.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala71
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala26
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala18
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala33
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala23
6 files changed, 88 insertions, 85 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 8d15150458..a570e4ed75 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -38,12 +38,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val doubleRing = ring ++ ring
val graph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1)
assert(graph.edges.count() === doubleRing.size)
- assert(graph.edges.collect.forall(e => e.attr == 1))
+ assert(graph.edges.collect().forall(e => e.attr == 1))
// uniqueEdges option should uniquify edges and store duplicate count in edge attributes
val uniqueGraph = Graph.fromEdgeTuples(sc.parallelize(doubleRing), 1, Some(RandomVertexCut))
assert(uniqueGraph.edges.count() === ring.size)
- assert(uniqueGraph.edges.collect.forall(e => e.attr == 2))
+ assert(uniqueGraph.edges.collect().forall(e => e.attr == 2))
}
}
@@ -64,7 +64,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
assert( graph.vertices.count() === 100)
- graph.triplets.collect.map { et =>
+ graph.triplets.collect().map { et =>
assert((et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr))
assert((et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr))
}
@@ -75,15 +75,17 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
- (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
+ assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect().toSet
+ === (1 to n).map(x => (0: VertexId, x: VertexId, "v", "v")).toSet)
}
}
test("partitionBy") {
withSpark { sc =>
- def mkGraph(edges: List[(Long, Long)]) = Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
- def nonemptyParts(graph: Graph[Int, Int]) = {
+ def mkGraph(edges: List[(Long, Long)]): Graph[Int, Int] = {
+ Graph.fromEdgeTuples(sc.parallelize(edges, 2), 0)
+ }
+ def nonemptyParts(graph: Graph[Int, Int]): RDD[List[Edge[Int]]] = {
graph.edges.partitionsRDD.mapPartitions { iter =>
Iterator(iter.next()._2.iterator.toList)
}.filter(_.nonEmpty)
@@ -102,7 +104,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1)
// partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into
// the same partition
- assert(nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
+ assert(
+ nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1)
// partitionBy(EdgePartition2D) puts identical edges in the same partition
assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1)
@@ -140,10 +143,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
- assert(g.triplets.collect.map(_.toTuple).toSet ===
+ assert(g.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = g.partitionBy(EdgePartition2D)
- assert(gPart.triplets.collect.map(_.toTuple).toSet ===
+ assert(gPart.triplets.collect().map(_.toTuple).toSet ===
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
}
}
@@ -154,10 +157,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
+ assert(mappedVAttrs.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
+ assert(mappedVAttrs2.vertices.collect().toSet === (0 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -177,12 +180,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
// Trigger initial vertex replication
graph0.triplets.foreach(x => {})
// Change type of replicated vertices, but preserve erased type
- val graph1 = graph0.mapVertices {
- case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
+ val graph1 = graph0.mapVertices { case (vid, integerOpt) =>
+ integerOpt.map((x: java.lang.Integer) => x.toDouble: java.lang.Double)
}
// Access replicated vertices, exposing the erased type
val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
- assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
+ assert(graph2.edges.map(_.attr).collect().toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
}
}
@@ -202,7 +205,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect.toSet ===
+ assert(star.mapTriplets(et => et.srcAttr + et.dstAttr).edges.collect().toSet ===
(1L to n).map(x => Edge(0, x, "vv")).toSet)
}
}
@@ -211,7 +214,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
+ assert(star.reverse.outDegrees.collect().toSet === (1 to n).map(x => (x: VertexId, 1)).toSet)
}
}
@@ -221,7 +224,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
- assert(result.collect.toSet === Set((1L, 2)))
+ assert(result.collect().toSet === Set((1L, 2)))
}
}
@@ -237,7 +240,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(subgraph.vertices.collect().toSet === (0 to n by 2).map(x => (x, "v")).toSet)
// And 4 edges.
- assert(subgraph.edges.map(_.copy()).collect().toSet === (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
+ assert(subgraph.edges.map(_.copy()).collect().toSet ===
+ (2 to n by 2).map(x => Edge(0, x, 1)).toSet)
}
}
@@ -273,9 +277,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
sc.parallelize((1 to n).flatMap(x =>
List((0: VertexId, x: VertexId), (0: VertexId, x: VertexId))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
- assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
- star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
- assert(star2.vertices.collect.toSet === star.vertices.collect.toSet)
+ assert(star2.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]) ===
+ star.edges.collect().toArray.sorted(Edge.lexicographicOrdering[Int]))
+ assert(star2.vertices.collect().toSet === star.vertices.collect().toSet)
}
}
@@ -300,21 +304,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
throw new Exception("map ran on edge with dst vid %d, which is odd".format(et.dstId))
}
Iterator((et.srcId, 1))
- }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
+ }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect().toSet
assert(numEvenNeighbors === (1 to n).map(x => (x: VertexId, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x+1) % n: VertexId)), 3)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexId, (x + 1) % n: VertexId)), 3)
val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
- val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) =>
+ newOpt.getOrElse(old)
+ }
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
if (et.srcId % 2 != 1) {
throw new Exception("map ran on edge with src vid %d, which is even".format(et.dstId))
}
Iterator((et.dstId, 1))
- }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
+ }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect().toSet
assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexId, 1)).toSet)
}
@@ -340,17 +346,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val reverseStar = starGraph(sc, n).reverse.cache()
// outerJoinVertices changing type
- val reverseStarDegrees =
- reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
+ val reverseStarDegrees = reverseStar.outerJoinVertices(reverseStar.outDegrees) {
+ (vid, a, bOpt) => bOpt.getOrElse(0)
+ }
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
- (a: Int, b: Int) => a + b).collect.toSet
+ (a: Int, b: Int) => a + b).collect().toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
- assert(newReverseStar.vertices.map(_._2).collect.toSet ===
+ assert(newReverseStar.vertices.map(_._2).collect().toSet ===
(0 to n).map(x => "v%d".format(x)).toSet)
}
}
@@ -361,7 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
val graph = Graph(verts, edges)
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
- .collect.toSet
+ .collect().toSet
assert(triplets ===
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
}
@@ -417,7 +424,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val graph = Graph.fromEdgeTuples(edges, 1)
val neighborAttrSums = graph.mapReduceTriplets[Int](
et => Iterator((et.dstId, et.srcAttr)), _ + _)
- assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+ assert(neighborAttrSums.collect().toSet === Set((0: VertexId, n)))
} finally {
sc.stop()
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
index a3e28efc75..d2ad9be555 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
*/
trait LocalSparkContext {
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
- def withSpark[T](f: SparkContext => T) = {
+ def withSpark[T](f: SparkContext => T): T = {
val conf = new SparkConf()
GraphXUtils.registerKryoClasses(conf)
val sc = new SparkContext("local", "test", conf)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index c9443d11c7..d0a7198d69 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.StorageLevel
class VertexRDDSuite extends FunSuite with LocalSparkContext {
- def vertices(sc: SparkContext, n: Int) = {
+ private def vertices(sc: SparkContext, n: Int) = {
VertexRDD(sc.parallelize((0 to n).map(x => (x.toLong, x)), 5))
}
@@ -52,7 +52,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => (i.toLong, 0))).cache()
val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1))).cache()
val vertexC = vertexA.minus(vertexB)
- assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+ assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}
@@ -62,7 +62,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB: RDD[(VertexId, Int)] =
sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
val vertexC = vertexA.minus(vertexB)
- assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+ assert(vertexC.map(_._1).collect().toSet === (0 until 25).toSet)
}
}
@@ -72,7 +72,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.minus(vertexB)
- assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
+ assert(vertexC.map(_._1).collect().toSet === (0 until 50).toSet)
}
}
@@ -106,7 +106,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
assert(vertexA.partitions.size != vertexB.partitions.size)
val vertexC = vertexA.diff(vertexB)
- assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
+ assert(vertexC.map(_._1).collect().toSet === (8 until 16).toSet)
}
}
@@ -116,11 +116,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// leftJoin with another VertexRDD
- assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
// leftJoin with an RDD
val evensRDD = evens.map(identity)
- assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
+ assert(verts.leftJoin(evensRDD) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
}
}
@@ -134,7 +134,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
old - newOpt.getOrElse(0)
}
- assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
+ assert(vertexC.filter(v => v._2 != 0).map(_._1).collect().toSet == (1 to 99 by 2).toSet)
}
}
@@ -144,11 +144,11 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n).cache()
val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// innerJoin with another VertexRDD
- assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
+ assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet)
// innerJoin with an RDD
val evensRDD = evens.map(identity)
- assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect.toSet ===
+ assert(verts.innerJoin(evensRDD) { (id, a, b) => a - b }.collect().toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet) }
}
@@ -161,7 +161,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
old - newVal
}
- assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
+ assert(vertexC.filter(v => v._2 == 0).map(_._1).collect().toSet == (0 to 98 by 2).toSet)
}
}
@@ -171,7 +171,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val verts = vertices(sc, n)
val messageTargets = (0 to n) ++ (0 to n by 2)
val messages = sc.parallelize(messageTargets.map(x => (x.toLong, 1)))
- assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect.toSet ===
+ assert(verts.aggregateUsingIndex[Int](messages, _ + _).collect().toSet ===
(0 to n).map(x => (x.toLong, if (x % 2 == 0) 2 else 1)).toSet)
}
}
@@ -183,7 +183,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
// test merge function
- assert(rdd.collect.toSet == Set((0L, 0), (1L, 3), (2L, 9)))
+ assert(rdd.collect().toSet == Set((0L, 0), (1L, 3), (2L, 9)))
}
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
index 3915be15b3..4cc30a9640 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
@@ -32,7 +32,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
val ccGraph = gridGraph.connectedComponents()
- val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components
@@ -42,7 +42,7 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
val ccGraph = gridGraph.connectedComponents()
- val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
+ val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum()
assert(maxCCid === 0)
}
} // end of Grid connected components
@@ -50,8 +50,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Chain Connected Components") {
withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val chain2 = (10 until 20).map(x => (x, x+1) )
+ val chain1 = (0 until 9).map(x => (x, x + 1))
+ val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
val ccGraph = twoChains.connectedComponents()
@@ -73,12 +73,12 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Reverse Chain Connected Components") {
withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val chain2 = (10 until 20).map(x => (x, x+1) )
+ val chain1 = (0 until 9).map(x => (x, x + 1))
+ val chain2 = (10 until 20).map(x => (x, x + 1))
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
val ccGraph = twoChains.connectedComponents()
- val vertices = ccGraph.vertices.collect
+ val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if (id < 10) {
assert(cc === 0)
@@ -120,9 +120,9 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val ccGraph = graph.connectedComponents()
- val vertices = ccGraph.vertices.collect
+ val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
- assert(cc == 0)
+ assert(cc === 0)
}
}
} // end of toy connected components
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index fc491ae327..95804b07b1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -19,15 +19,12 @@ package org.apache.spark.graphx.lib
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
-import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
-import org.apache.spark.rdd._
+
object GridPageRank {
- def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
+ def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
@@ -35,13 +32,13 @@ object GridPageRank {
// Make the grid graph
for (r <- 0 until nRows; c <- 0 until nCols) {
val ind = sub2ind(r,c)
- if (r+1 < nRows) {
+ if (r + 1 < nRows) {
outDegree(ind) += 1
- inNbrs(sub2ind(r+1,c)) += ind
+ inNbrs(sub2ind(r + 1,c)) += ind
}
- if (c+1 < nCols) {
+ if (c + 1 < nCols) {
outDegree(ind) += 1
- inNbrs(sub2ind(r,c+1)) += ind
+ inNbrs(sub2ind(r,c + 1)) += ind
}
}
// compute the pagerank
@@ -64,7 +61,7 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
- .map { case (id, error) => error }.sum
+ .map { case (id, error) => error }.sum()
}
test("Star PageRank") {
@@ -80,12 +77,12 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
- }.map { case (vid, test) => test }.sum
+ }.map { case (vid, test) => test }.sum()
assert(notMatching === 0)
val staticErrors = staticRanks2.map { case (vid, pr) =>
- val correct = (vid > 0 && pr == resetProb) ||
- (vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
+ val p = math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) ))
+ val correct = (vid > 0 && pr == resetProb) || (vid == 0L && p < 1.0E-5)
if (!correct) 1 else 0
}
assert(staticErrors.sum === 0)
@@ -95,8 +92,6 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
}
} // end of test Star PageRank
-
-
test("Grid PageRank") {
withSpark { sc =>
val rows = 10
@@ -109,18 +104,18 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache()
- val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
+ val referenceRanks = VertexRDD(
+ sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
assert(compareRanks(staticRanks, referenceRanks) < errorTol)
assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
}
} // end of Grid PageRank
-
test("Chain PageRank") {
withSpark { sc =>
- val chain1 = (0 until 9).map(x => (x, x+1) )
- val rawEdges = sc.parallelize(chain1, 1).map { case (s,d) => (s.toLong, d.toLong) }
+ val chain1 = (0 until 9).map(x => (x, x + 1))
+ val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) }
val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
val resetProb = 0.15
val tol = 0.0001
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
index df54aa37ca..1f658c371f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/StronglyConnectedComponentsSuite.scala
@@ -34,8 +34,8 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val edges = sc.parallelize(Seq.empty[Edge[Int]])
val graph = Graph(vertices, edges)
val sccGraph = graph.stronglyConnectedComponents(5)
- for ((id, scc) <- sccGraph.vertices.collect) {
- assert(id == scc)
+ for ((id, scc) <- sccGraph.vertices.collect()) {
+ assert(id === scc)
}
}
}
@@ -45,8 +45,8 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
val graph = Graph.fromEdgeTuples(rawEdges, -1)
val sccGraph = graph.stronglyConnectedComponents(20)
- for ((id, scc) <- sccGraph.vertices.collect) {
- assert(0L == scc)
+ for ((id, scc) <- sccGraph.vertices.collect()) {
+ assert(0L === scc)
}
}
}
@@ -60,13 +60,14 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val rawEdges = sc.parallelize(edges)
val graph = Graph.fromEdgeTuples(rawEdges, -1)
val sccGraph = graph.stronglyConnectedComponents(20)
- for ((id, scc) <- sccGraph.vertices.collect) {
- if (id < 3)
- assert(0L == scc)
- else if (id < 6)
- assert(3L == scc)
- else
- assert(id == scc)
+ for ((id, scc) <- sccGraph.vertices.collect()) {
+ if (id < 3) {
+ assert(0L === scc)
+ } else if (id < 6) {
+ assert(3L === scc)
+ } else {
+ assert(id === scc)
+ }
}
}
}