aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/main/scala/org
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-28 20:17:16 -0700
committerReynold Xin <rxin@databricks.com>2015-05-28 20:17:16 -0700
commitb069ad23d9b6cbfb3a8bf245547add4816669075 (patch)
tree82498e8fa61ae0399d1d7f2eefb04e521285da53 /graphx/src/main/scala/org
parent7f7505d8db7759ea46e904f767c23130eff1104a (diff)
downloadspark-b069ad23d9b6cbfb3a8bf245547add4816669075.tar.gz
spark-b069ad23d9b6cbfb3a8bf245547add4816669075.tar.bz2
spark-b069ad23d9b6cbfb3a8bf245547add4816669075.zip
[SPARK-7927] whitespace fixes for GraphX.
So we can enable a whitespace enforcement rule in the style checker to save code review time. Author: Reynold Xin <rxin@databricks.com> Closes #6474 from rxin/whitespace-graphx and squashes the following commits: 4d3cd26 [Reynold Xin] Fixed tests. 869dde4 [Reynold Xin] [SPARK-7927] whitespace fixes for GraphX.
Diffstat (limited to 'graphx/src/main/scala/org')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala9
10 files changed, 27 insertions, 26 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
index 058c8c8aa1..ce1054ed92 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
@@ -26,8 +26,8 @@ class EdgeDirection private (private val name: String) extends Serializable {
* out becomes in and both and either remain the same.
*/
def reverse: EdgeDirection = this match {
- case EdgeDirection.In => EdgeDirection.Out
- case EdgeDirection.Out => EdgeDirection.In
+ case EdgeDirection.In => EdgeDirection.Out
+ case EdgeDirection.Out => EdgeDirection.In
case EdgeDirection.Either => EdgeDirection.Either
case EdgeDirection.Both => EdgeDirection.Both
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index c8790cac3d..65f82429d2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -37,7 +37,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
/**
* Set the edge properties of this triplet.
*/
- protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD,ED] = {
+ protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
srcId = other.srcId
dstId = other.dstId
attr = other.attr
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 36dc7b0f86..db73a8abc5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -316,7 +316,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* satisfy the predicates
*/
def subgraph(
- epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 7edd627b20..9451ff1e5c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -124,18 +124,18 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
val nbrs = edgeDirection match {
case EdgeDirection.Either =>
- graph.aggregateMessages[Array[(VertexId,VD)]](
+ graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => {
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
(a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
- graph.aggregateMessages[Array[(VertexId,VD)]](
+ graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
(a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
- graph.aggregateMessages[Array[(VertexId,VD)]](
+ graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
(a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
@@ -253,7 +253,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
- vpred: (VertexId, VD2) => Boolean = (v:VertexId, d:VD2) => true): Graph[VD, ED] = {
+ vpred: (VertexId, VD2) => Boolean = (v: VertexId, d: VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
@@ -356,7 +356,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 01b013ff71..cfcf7244ea 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -147,10 +147,10 @@ object Pregel extends Logging {
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
- oldMessages.unpersist(blocking=false)
- newVerts.unpersist(blocking=false)
- prevG.unpersistVertices(blocking=false)
- prevG.edges.unpersist(blocking=false)
+ oldMessages.unpersist(blocking = false)
+ newVerts.unpersist(blocking = false)
+ prevG.unpersistVertices(blocking = false)
+ prevG.edges.unpersist(blocking = false)
// count the iteration
i += 1
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index c561570809..ab021a252e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -156,8 +156,8 @@ class EdgePartition[
val size = data.size
var i = 0
while (i < size) {
- edge.srcId = srcIds(i)
- edge.dstId = dstIds(i)
+ edge.srcId = srcIds(i)
+ edge.dstId = dstIds(i)
edge.attr = data(i)
newData(i) = f(edge)
i += 1
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index bc974b2f04..8c0a461e99 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -116,7 +116,7 @@ object PageRank extends Logging {
val personalized = srcId isDefined
val src: VertexId = srcId.getOrElse(-1L)
- def delta(u: VertexId, v: VertexId):Double = { if (u == v) 1.0 else 0.0 }
+ def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
@@ -133,13 +133,13 @@ object PageRank extends Logging {
// edge partitions.
prevRankGraph = rankGraph
val rPrb = if (personalized) {
- (src: VertexId ,id: VertexId) => resetProb * delta(src,id)
+ (src: VertexId , id: VertexId) => resetProb * delta(src, id)
} else {
(src: VertexId, id: VertexId) => resetProb
}
rankGraph = rankGraph.joinVertices(rankUpdates) {
- (id, oldRank, msgSum) => rPrb(src,id) + (1.0 - resetProb) * msgSum
+ (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
@@ -243,7 +243,7 @@ object PageRank extends Logging {
// Execute a dynamic version of Pregel.
val vp = if (personalized) {
- (id: VertexId, attr: (Double, Double),msgSum: Double) =>
+ (id: VertexId, attr: (Double, Double), msgSum: Double) =>
personalizedVertexProgram(id, attr, msgSum)
} else {
(id: VertexId, attr: (Double, Double), msgSum: Double) =>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 3b0e1628d8..9cb24ed080 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -210,7 +210,7 @@ object SVDPlusPlus {
/**
* Forces materialization of a Graph by count()ing its RDDs.
*/
- private def materialize(g: Graph[_,_]): Unit = {
+ private def materialize(g: Graph[_, _]): Unit = {
g.vertices.count()
g.edges.count()
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index daf162085e..a5d598053f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -38,7 +38,7 @@ import org.apache.spark.graphx._
*/
object TriangleCount {
- def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache()
@@ -49,7 +49,7 @@ object TriangleCount {
var i = 0
while (i < nbrs.size) {
// prevent self cycle
- if(nbrs(i) != vid) {
+ if (nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 2d6a825b61..9591c4e9b8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -243,14 +243,15 @@ object GraphGenerators {
* @return A graph containing vertices with the row and column ids
* as their attributes and edge values as 1.0.
*/
- def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
+ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int, Int), Double] = {
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): VertexId = r * cols + c
- val vertices: RDD[(VertexId, (Int,Int))] =
- sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
+ val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(0 until rows).flatMap { r =>
+ (0 until cols).map( c => (sub2ind(r, c), (r, c)) )
+ }
val edges: RDD[Edge[Double]] =
- vertices.flatMap{ case (vid, (r,c)) =>
+ vertices.flatMap{ case (vid, (r, c)) =>
(if (r + 1 < rows) { Seq( (sub2ind(r, c), sub2ind(r + 1, c))) } else { Seq.empty }) ++
(if (c + 1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c + 1))) } else { Seq.empty })
}.map{ case (src, dst) => Edge(src, dst, 1.0) }