aboutsummaryrefslogtreecommitdiff
path: root/bagel/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-09 10:09:19 -0800
commitb69f8b2a01669851c656739b6886efe4cddef31a (patch)
tree9ed0b8a6883f0ae64ce1d3f5c206306731a5f93f /bagel/src/main
parentb6dba10ae59215b5c4e40f7632563f592f138c87 (diff)
downloadspark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.gz
spark-b69f8b2a01669851c656739b6886efe4cddef31a.tar.bz2
spark-b69f8b2a01669851c656739b6886efe4cddef31a.zip
Merge pull request #557 from ScrapCodes/style. Closes #557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell <pwendell@gmail.com> Author: Prashant Sharma <scrapcodes@gmail.com> == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma <scrapcodes@gmail.com> Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell <pwendell@gmail.com> Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot
Diffstat (limited to 'bagel/src/main')
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala55
1 files changed, 32 insertions, 23 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 44e26bbb9e..281216612f 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -28,21 +28,22 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
- * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
- * the vertex id.
- * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
- * empty array, i.e. sc.parallelize(Array[K, Message]()).
- * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
- * message before sending (which often involves network I/O).
- * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
- * and provides the result to each vertex in the next superstep.
+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
+ * Key will be the vertex id.
+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
+ * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
+ * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
+ * given vertex into one message before sending (which often involves network I/O).
+ * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
+ * after each superstep and provides the result to each vertex in the next
+ * superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
- * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
- * Defaults to caching in memory.
- * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
- * optional Aggregator and the current superstep,
+ * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+ * intermediate RDDs in each superstep. Defaults to caching in memory.
+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
+ * the Vertex, optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
@@ -71,7 +72,7 @@ object Bagel extends Logging {
var msgs = messages
var noActivity = false
do {
- logInfo("Starting superstep "+superstep+".")
+ logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
val aggregated = agg(verts, aggregator)
@@ -97,7 +98,8 @@ object Bagel extends Logging {
verts
}
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
+ * storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -106,8 +108,8 @@ object Bagel extends Logging {
partitioner: Partitioner,
numPartitions: Int
)(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+ compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
+ combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
@@ -127,8 +129,8 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
- * and default storage level
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
+ * [[org.apache.spark.HashPartitioner]] and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -138,9 +140,13 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
+ DEFAULT_STORAGE_LEVEL)(compute)
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
+ /**
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
+ * default [[org.apache.spark.HashPartitioner]]
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -158,7 +164,8 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+ * default [[org.apache.spark.HashPartitioner]],
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -171,7 +178,8 @@ object Bagel extends Logging {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
/**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+ * the default [[org.apache.spark.HashPartitioner]]
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -227,8 +235,9 @@ object Bagel extends Logging {
})
numMsgs += newMsgs.size
- if (newVert.active)
+ if (newVert.active) {
numActiveVerts += 1
+ }
Some((newVert, newMsgs))
}.persist(storageLevel)