diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-02-09 22:17:52 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-02-09 22:17:52 -0800 |
commit | 919bd7f669c61500eee7231298d9880b320eb6f3 (patch) | |
tree | 5cdcf197aef425b6be47b676f8d7fe3d1e2e8c34 /streaming/src | |
parent | 2182aa3c55737a90e0ff200eede7146b440801a3 (diff) | |
download | spark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.gz spark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.bz2 spark-919bd7f669c61500eee7231298d9880b320eb6f3.zip |
Merge pull request #567 from ScrapCodes/style2.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2
Continuation of PR #557
With this all scala style errors are fixed across the code base !!
The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes #567 and squashes the following commits:
3b1ec30 [Prashant Sharma] scala style fixes
Diffstat (limited to 'streaming/src')
18 files changed, 108 insertions, 65 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 4d778dc4d4..baf80fe2a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -128,7 +128,8 @@ class CheckpointWriter( while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { - logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") + logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + + "'") // Write checkpoint to temp file fs.delete(tempFile, true) // just in case it exists @@ -167,11 +168,13 @@ class CheckpointWriter( return } catch { case ioe: IOException => - logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) + logWarning("Error in attempt " + attempts + " of writing checkpoint to " + + checkpointFile, ioe) reset() } } - logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'") + logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + + checkpointFile + "'") } } @@ -220,7 +223,8 @@ class CheckpointWriter( private[streaming] object CheckpointReader extends Logging { - def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { + def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = + { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 0683113bd0..fde46705d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -153,7 +153,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") - //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + + // " is very low") assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala index 04c994c136..16479a0127 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -33,7 +33,8 @@ class Interval(val beginTime: Time, val endTime: Time) { def < (that: Interval): Boolean = { if (this.duration != that.duration) { - throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") + throw new Exception("Comparing two intervals with different durations [" + this + ", " + + that + "]") } this.endTime < that.endTime } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64fe204cdf..7aa7ead29b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -78,8 +78,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in + * the window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { dstream.countByWindow(windowDuration, slideDuration) @@ -87,8 +87,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -103,8 +103,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 62cfa0a229..4dcd0e4c51 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { dstream.reduceByKey(func, partitioner) @@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], @@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter @@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 921b56143a..2268160dcc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local - * file system or an HDFS, HTTP, HTTPS, or FTP URL. + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the + * local file system or an HDFS, HTTP, HTTPS, or FTP URL. */ def this( master: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 906a16e508..903e3f3c9b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } override def toString() = { - "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + + currentCheckpointFiles.mkString("\n") + "\n]" } @throws(classOf[IOException]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 27303390d9..226844c228 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) } else { // Time is valid, but check it it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { - logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + logWarning("isTimeValid called with " + time + " where as last valid time is " + + lastValidTime) } lastValidTime = time true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index ce153f065d..0dc6704603 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) + extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } /** - * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into - * appropriately named blocks at regular intervals. This class starts two threads, + * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts + * them into appropriately named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb9df2f48e..f3c58aede0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration} * these functions. */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) -extends Serializable { + extends Serializable { private[streaming] def ssc = self.ssc - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) + = { new HashPartitioner(numPartitions) } @@ -63,8 +64,8 @@ extends Serializable { } /** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] - * is used to control the partitioning of each RDD. + * Return a new DStream by applying `groupByKey` on each RDD. The supplied + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -94,8 +95,8 @@ extends Serializable { /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -113,7 +114,8 @@ extends Serializable { mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, + mapSideCombine) } /** @@ -138,7 +140,8 @@ extends Serializable { * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = + { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -170,7 +173,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -239,7 +243,8 @@ extends Serializable { slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, + defaultPartitioner(numPartitions)) } /** @@ -315,7 +320,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc Optional function to filter expired key-value pairs; * only pairs that satisfy the function are retained */ @@ -373,7 +379,8 @@ extends Serializable { * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S: ClassTag]( @@ -395,7 +402,8 @@ extends Serializable { * this function may generate a different a tuple with a different key * than the input key. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ @@ -438,7 +446,8 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) + : DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -566,7 +575,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -580,7 +590,7 @@ extends Serializable { valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf - ) { + ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) @@ -596,7 +606,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 7a6b1ea35e..ca0a8ae478 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val invReduceF = invReduceFunc val currentTime = validTime - val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, + currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) @@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], + partitioner) //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ val numOldValues = oldRDDs.size diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 4ecba03ab5..57429a1532 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + + validTime) }) if (rdds.size > 0) { Some(new UnionRDD(ssc.sc, rdds)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 6301772468..24289b714f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag]( _slideDuration: Duration) extends DStream[T](parent.ssc) { - if (!_windowDuration.isMultipleOf(parent.slideDuration)) + if (!_windowDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } - if (!_slideDuration.isMultipleOf(parent.slideDuration)) + if (!_slideDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index b5f11d3440..c7306248b1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime))) - private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + private lazy val checkpointWriter = + if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 0d9733fa69..e4fa163f2e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext} import org.apache.spark.util.AkkaUtils private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage -private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) + extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) + extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) + extends NetworkInputTrackerMessage /** * This class manages the execution of the receivers of NetworkInputDStreams. Instance of @@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } if (!networkInputStreams.isEmpty) { - actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), + "NetworkInputTracker") receiverExecutor.start() logInfo("NetworkInputTracker started") } @@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) + logInfo("Registered receiver for network stream " + streamId + " from " + + sender.path.address) sender ! true } case AddBlocks(streamId, blockIds, metadata) => { @@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { }) // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined) + .reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { - val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) + val receiversWithPreferences = + receivers.map(r => (r, Seq(r.getLocationPreference().toString))) ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) } else { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 3063cf10a3..18811fc2b0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ private[spark] class StreamingListenerBus() extends Logging { - private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener] + private val listeners = new ArrayBuffer[StreamingListener]() + with SynchronizedBuffer[StreamingListener] /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 6a45bc2f8a..2bb616cfb8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } } - if (!done) + if (!done) { logError("Could not generate file " + hadoopFile) - else + } else { logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + } Thread.sleep(interval) localFile.delete() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala index 179fd75939..2b8cdb72b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu } } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. - // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) - val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + // Based on throttler in Kafka + // scalastyle:off + // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + // scalastyle:on + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), + SECONDS) if (sleepTime > 0) Thread.sleep(sleepTime) waitToWrite(numBytes) } |