From b69f8b2a01669851c656739b6886efe4cddef31a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 9 Feb 2014 10:09:19 -0800 Subject: Merge pull request #557 from ScrapCodes/style. Closes #557. SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell Author: Prashant Sharma == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot --- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 7 ++--- .../main/scala/org/apache/spark/graphx/Graph.scala | 19 ++++++------- .../apache/spark/graphx/PartitionStrategy.scala | 14 +++++----- .../scala/org/apache/spark/graphx/VertexRDD.scala | 23 +++++++++------- .../org/apache/spark/graphx/lib/Analytics.scala | 2 +- .../apache/spark/graphx/util/GraphGenerators.scala | 31 +++++++++++----------- 6 files changed, 52 insertions(+), 44 deletions(-) (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index fe03ae4a62..799a9dd1ee 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -66,7 +66,8 @@ class EdgeRDD[@specialized ED: ClassTag]( this } - private[graphx] def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]) + private[graphx] def mapEdgePartitions[ED2: ClassTag]( + f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]) : EdgeRDD[ED2] = { new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => val (pid, ep) = iter.next() @@ -97,8 +98,8 @@ class EdgeRDD[@specialized ED: ClassTag]( * * @param other the EdgeRDD to join with * @param f the join function applied to corresponding values of `this` and `other` - * @return a new EdgeRDD containing only edges that appear in both `this` and `other`, with values - * supplied by `f` + * @return a new EdgeRDD containing only edges that appear in both `this` and `other`, + * with values supplied by `f` */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index eea95d38d5..65a1a8c68f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -171,8 +171,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab : Graph[VD, ED2] /** - * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes - * as well. If adjacent vertex values are not required, consider using `mapEdges` instead. + * Transforms each edge attribute using the map function, passing it the adjacent vertex + * attributes as well. If adjacent vertex values are not required, + * consider using `mapEdges` instead. * * @note This does not change the structure of the * graph or modify the values of this graph. As a consequence @@ -280,13 +281,13 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * be commutative and associative and is used to combine the output * of the map phase * - * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider - * when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on edges with - * destination in the active set. If the direction is `Out`, `mapFunc` will only be run on edges - * originating from vertices in the active set. If the direction is `Either`, `mapFunc` will be - * run on edges with *either* vertex in the active set. If the direction is `Both`, `mapFunc` will - * be run on edges with *both* vertices in the active set. The active set must have the same index - * as the graph's vertices. + * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to + * consider when running `mapFunc`. If the direction is `In`, `mapFunc` will only be run on + * edges with destination in the active set. If the direction is `Out`, + * `mapFunc` will only be run on edges originating from vertices in the active set. If the + * direction is `Either`, `mapFunc` will be run on edges with *either* vertex in the active set + * . If the direction is `Both`, `mapFunc` will be run on edges with *both* vertices in the + * active set. The active set must have the same index as the graph's vertices. * * @example We can use this function to compute the in-degree of each * vertex diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 929915362c..0470d74cf9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -57,8 +57,9 @@ object PartitionStrategy { * * * The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the - * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice - * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last + * processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice + * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, + * P6)` or the last * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be * replicated to at most `2 * sqrt(numParts)` machines. * @@ -66,11 +67,12 @@ object PartitionStrategy { * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the * vertex locations. * - * One of the limitations of this approach is that the number of machines must either be a perfect - * square. We partially address this limitation by computing the machine assignment to the next + * One of the limitations of this approach is that the number of machines must either be a + * perfect square. We partially address this limitation by computing the machine assignment to + * the next * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square - * is used. + * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect + * square is used. */ case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index edd59bcf32..d6788d4d4b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -59,7 +59,8 @@ class VertexRDD[@specialized VD: ClassTag]( /** * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting - * VertexRDD will be based on a different index and can no longer be quickly joined with this RDD. + * VertexRDD will be based on a different index and can no longer be quickly joined with this + * RDD. */ def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) @@ -101,7 +102,8 @@ class VertexRDD[@specialized VD: ClassTag]( /** * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD. */ - private[graphx] def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2]) + private[graphx] def mapVertexPartitions[VD2: ClassTag]( + f: VertexPartition[VD] => VertexPartition[VD2]) : VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) new VertexRDD(newPartitionsRDD) @@ -159,8 +161,9 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Left joins this RDD with another VertexRDD with the same index. This function will fail if both - * VertexRDDs do not share the same index. The resulting vertex set contains an entry for each + * Left joins this RDD with another VertexRDD with the same index. This function will fail if + * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for + * each * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`. * * @tparam VD2 the attribute type of the other VertexRDD @@ -187,8 +190,8 @@ class VertexRDD[@specialized VD: ClassTag]( * Left joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is * backed by a VertexRDD with the same index then the efficient [[leftZipJoin]] implementation is * used. The resulting VertexRDD contains an entry for each vertex in `this`. If `other` is - * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, the vertex - * is picked arbitrarily. + * missing any vertex in this VertexRDD, `f` is passed `None`. If there are duplicates, + * the vertex is picked arbitrarily. * * @tparam VD2 the attribute type of the other VertexRDD * @tparam VD3 the attribute type of the resulting VertexRDD @@ -238,14 +241,14 @@ class VertexRDD[@specialized VD: ClassTag]( /** * Inner joins this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is - * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation is - * used. + * backed by a VertexRDD with the same index then the efficient [[innerZipJoin]] implementation + * is used. * * @param other an RDD containing vertices to join. If there are multiple entries for the same * vertex, one is picked arbitrarily. Use [[aggregateUsingIndex]] to merge multiple entries. * @param f the join function applied to corresponding values of `this` and `other` - * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both `this` - * and `other`, with values supplied by `f` + * @return a VertexRDD co-indexed with `this`, containing only vertices that appear in both + * `this` and `other`, with values supplied by `f` */ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) (f: (VertexId, VD, U) => VD2): VertexRDD[VD2] = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index f914e0565c..24699dfdd3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -82,7 +82,7 @@ object Analytics extends Logging { val pr = graph.pageRank(tol).vertices.cache() - println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_)) + println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) if (!outFname.isEmpty) { logWarning("Saving pageranks of pages to " + outFname) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 7677641bfe..f841846c0e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -37,11 +37,7 @@ object GraphGenerators { val RMATa = 0.45 val RMATb = 0.15 - val RMATc = 0.15 val RMATd = 0.25 - - // Right now it just generates a bunch of edges where - // the edge data is the weight (default 1) /** * Generate a graph whose vertex out degree is log normal. */ @@ -59,15 +55,20 @@ object GraphGenerators { Graph(vertices, edges, 0) } + // Right now it just generates a bunch of edges where + // the edge data is the weight (default 1) + val RMATc = 0.15 + def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): Array[Edge[Int]] = { val rand = new Random() Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) } } /** - * Randomly samples from a log normal distribution whose corresponding normal distribution has the - * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`, `s` are - * the mean, standard deviation of the lognormal distribution and `Z ~ N(0, 1)`. In this function, + * Randomly samples from a log normal distribution whose corresponding normal distribution has + * the given mean and standard deviation. It uses the formula `X = exp(m+s*Z)` where `m`, + * `s` are the mean, standard deviation of the lognormal distribution and + * `Z ~ N(0, 1)`. In this function, * `m = e^(mu+sigma^2/2)` and `s = sqrt[(e^(sigma^2) - 1)(e^(2*mu+sigma^2))]`. * * @param mu the mean of the normal distribution @@ -76,7 +77,7 @@ object GraphGenerators { */ private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = { val rand = new Random() - val m = math.exp(mu+(sigma*sigma)/2.0) + val m = math.exp(mu + (sigma * sigma) / 2.0) val s = math.sqrt((math.exp(sigma*sigma) - 1) * math.exp(2*mu + sigma*sigma)) // Z ~ N(0, 1) var X: Double = maxVal @@ -169,9 +170,9 @@ object GraphGenerators { val newT = math.round(t.toFloat/2.0).toInt pickQuadrant(RMATa, RMATb, RMATc, RMATd) match { case 0 => chooseCell(x, y, newT) - case 1 => chooseCell(x+newT, y, newT) - case 2 => chooseCell(x, y+newT, newT) - case 3 => chooseCell(x+newT, y+newT, newT) + case 1 => chooseCell(x + newT, y, newT) + case 2 => chooseCell(x, y + newT, newT) + case 3 => chooseCell(x + newT, y + newT, newT) } } } @@ -179,8 +180,8 @@ object GraphGenerators { // TODO(crankshaw) turn result into an enum (or case class for pattern matching} private def pickQuadrant(a: Double, b: Double, c: Double, d: Double): Int = { if (a + b + c + d != 1.0) { - throw new IllegalArgumentException( - "R-MAT probability parameters sum to " + (a+b+c+d) + ", should sum to 1.0") + throw new IllegalArgumentException("R-MAT probability parameters sum to " + (a + b + c + d) + + ", should sum to 1.0") } val rand = new Random() val result = rand.nextDouble() @@ -212,8 +213,8 @@ object GraphGenerators { sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) val edges: RDD[Edge[Double]] = vertices.flatMap{ case (vid, (r,c)) => - (if (r+1 < rows) { Seq( (sub2ind(r, c), sub2ind(r+1, c))) } else { Seq.empty }) ++ - (if (c+1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c+1))) } else { Seq.empty }) + (if (r + 1 < rows) { Seq( (sub2ind(r, c), sub2ind(r + 1, c))) } else { Seq.empty }) ++ + (if (c + 1 < cols) { Seq( (sub2ind(r, c), sub2ind(r, c + 1))) } else { Seq.empty }) }.map{ case (src, dst) => Edge(src, dst, 1.0) } Graph(vertices, edges) } // end of gridGraph -- cgit v1.2.3