From b69f8b2a01669851c656739b6886efe4cddef31a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 9 Feb 2014 10:09:19 -0800 Subject: Merge pull request #557 from ScrapCodes/style. Closes #557. SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell Author: Prashant Sharma == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot --- .../main/scala/org/apache/spark/bagel/Bagel.scala | 55 +++++++++++++--------- 1 file changed, 32 insertions(+), 23 deletions(-) (limited to 'bagel/src') diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 44e26bbb9e..281216612f 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -28,21 +28,22 @@ object Bagel extends Logging { /** * Runs a Bagel program. * @param sc [[org.apache.spark.SparkContext]] to use for the program. - * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be - * the vertex id. - * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an - * empty array, i.e. sc.parallelize(Array[K, Message]()). - * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one - * message before sending (which often involves network I/O). - * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep, - * and provides the result to each vertex in the next superstep. + * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the + * Key will be the vertex id. + * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often + * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()). + * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a + * given vertex into one message before sending (which often involves network I/O). + * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices + * after each superstep and provides the result to each vertex in the next + * superstep. * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key * @param numPartitions number of partitions across which to split the graph. * Default is the default parallelism of the SparkContext - * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep. - * Defaults to caching in memory. - * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex, - * optional Aggregator and the current superstep, + * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of + * intermediate RDDs in each superstep. Defaults to caching in memory. + * @param compute function that takes a Vertex, optional set of (possibly combined) messages to + * the Vertex, optional Aggregator and the current superstep, * and returns a set of (Vertex, outgoing Messages) pairs * @tparam K key * @tparam V vertex type @@ -71,7 +72,7 @@ object Bagel extends Logging { var msgs = messages var noActivity = false do { - logInfo("Starting superstep "+superstep+".") + logInfo("Starting superstep " + superstep + ".") val startTime = System.currentTimeMillis val aggregated = agg(verts, aggregator) @@ -97,7 +98,8 @@ object Bagel extends Logging { verts } - /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */ + /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default + * storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -106,8 +108,8 @@ object Bagel extends Logging { partitioner: Partitioner, numPartitions: Int )( - compute: (V, Option[C], Int) => (V, Array[M]) - ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages, + combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( @@ -127,8 +129,8 @@ object Bagel extends Logging { } /** - * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]] - * and default storage level + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default + * [[org.apache.spark.HashPartitioner]] and default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -138,9 +140,13 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) - ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, + DEFAULT_STORAGE_LEVEL)(compute) - /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/ + /** + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the + * default [[org.apache.spark.HashPartitioner]] + */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -158,7 +164,8 @@ object Bagel extends Logging { } /** - * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]], + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], + * default [[org.apache.spark.HashPartitioner]], * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( @@ -171,7 +178,8 @@ object Bagel extends Logging { ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) /** - * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]] + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], + * the default [[org.apache.spark.HashPartitioner]] * and [[org.apache.spark.bagel.DefaultCombiner]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( @@ -227,8 +235,9 @@ object Bagel extends Logging { }) numMsgs += newMsgs.size - if (newVert.active) + if (newVert.active) { numActiveVerts += 1 + } Some((newVert, newMsgs)) }.persist(storageLevel) -- cgit v1.2.3