aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-02-09 22:17:52 -0800
committerReynold Xin <rxin@apache.org>2014-02-09 22:17:52 -0800
commit919bd7f669c61500eee7231298d9880b320eb6f3 (patch)
tree5cdcf197aef425b6be47b676f8d7fe3d1e2e8c34 /streaming
parent2182aa3c55737a90e0ff200eede7146b440801a3 (diff)
downloadspark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.gz
spark-919bd7f669c61500eee7231298d9880b320eb6f3.tar.bz2
spark-919bd7f669c61500eee7231298d9880b320eb6f3.zip
Merge pull request #567 from ScrapCodes/style2.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2 Continuation of PR #557 With this all scala style errors are fixed across the code base !! The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too. Author: Prashant Sharma <prashant.s@imaginea.com> Closes #567 and squashes the following commits: 3b1ec30 [Prashant Sharma] scala style fixes
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala45
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala21
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala8
18 files changed, 108 insertions, 65 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 4d778dc4d4..baf80fe2a9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,7 +128,8 @@ class CheckpointWriter(
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
- logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
+ logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
+ + "'")
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
@@ -167,11 +168,13 @@ class CheckpointWriter(
return
} catch {
case ioe: IOException =>
- logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
+ logWarning("Error in attempt " + attempts + " of writing checkpoint to "
+ + checkpointFile, ioe)
reset()
}
}
- logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
+ logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
+ + checkpointFile + "'")
}
}
@@ -220,7 +223,8 @@ class CheckpointWriter(
private[streaming]
object CheckpointReader extends Logging {
- def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
+ def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
+ {
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0683113bd0..fde46705d8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
- //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+ //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+ // " is very low")
assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 04c994c136..16479a0127 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -33,7 +33,8 @@ class Interval(val beginTime: Time, val endTime: Time) {
def < (that: Interval): Boolean = {
if (this.duration != that.duration) {
- throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
+ throw new Exception("Comparing two intervals with different durations [" + this + ", "
+ + that + "]")
}
this.endTime < that.endTime
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64fe204cdf..7aa7ead29b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -78,8 +78,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in
+ * the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
dstream.countByWindow(windowDuration, slideDuration)
@@ -87,8 +87,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
- * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
- * Spark's default number of partitions.
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -103,8 +103,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD contains the count of distinct elements in
- * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
- * partitions.
+ * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 62cfa0a229..4dcd0e4c51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
- * partitioning of each RDD.
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * thepartitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
dstream.reduceByKey(func, partitioner)
@@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
- * information.
+ * combineByKey for RDDs. Please refer to combineByKey in
+ * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
- * information.
+ * combineByKey for RDDs. Please refer to combineByKey in
+ * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
@@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @param filterFunc function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
* set this to null if you do not want to filter
@@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @tparam S State type
*/
def updateStateByKey[S](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 921b56143a..2268160dcc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
* @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
- * file system or an HDFS, HTTP, HTTPS, or FTP URL.
+ * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the
+ * local file system or an HDFS, HTTP, HTTPS, or FTP URL.
*/
def this(
master: String,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 906a16e508..903e3f3c9b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
override def toString() = {
- "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
+ "[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
+ currentCheckpointFiles.mkString("\n") + "\n]"
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 27303390d9..226844c228 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
} else {
// Time is valid, but check it it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
- logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ logWarning("isTimeValid called with " + time + " where as last valid time is " +
+ lastValidTime)
}
lastValidTime = time
true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index ce153f065d..0dc6704603 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
+ extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
/**
@@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}
/**
- * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into
- * appropriately named blocks at regular intervals. This class starts two threads,
+ * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
+ * them into appropriately named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb9df2f48e..f3c58aede0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration}
* these functions.
*/
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-extends Serializable {
+ extends Serializable {
private[streaming] def ssc = self.ssc
- private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)
+ = {
new HashPartitioner(numPartitions)
}
@@ -63,8 +64,8 @@ extends Serializable {
}
/**
- * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
- * is used to control the partitioning of each RDD.
+ * Return a new DStream by applying `groupByKey` on each RDD. The supplied
+ * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -94,8 +95,8 @@ extends Serializable {
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
- * partitioning of each RDD.
+ * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -113,7 +114,8 @@ extends Serializable {
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = {
- new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
+ mapSideCombine)
}
/**
@@ -138,7 +140,8 @@ extends Serializable {
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+ {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -170,7 +173,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
@@ -239,7 +243,8 @@ extends Serializable {
slideDuration: Duration,
numPartitions: Int
): DStream[(K, V)] = {
- reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+ reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
+ defaultPartitioner(numPartitions))
}
/**
@@ -315,7 +320,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
- * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @param filterFunc Optional function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
*/
@@ -373,7 +379,8 @@ extends Serializable {
* [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
@@ -395,7 +402,8 @@ extends Serializable {
* this function may generate a different a tuple with a different key
* than the input key. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
- * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+ * DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
@@ -438,7 +446,8 @@ extends Serializable {
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
+ : DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -566,7 +575,8 @@ extends Serializable {
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -580,7 +590,7 @@ extends Serializable {
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
- ) {
+ ) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
@@ -596,7 +606,8 @@ extends Serializable {
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 7a6b1ea35e..ca0a8ae478 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val invReduceF = invReduceFunc
val currentTime = validTime
- val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+ val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
+ currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
@@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
- val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
+ partitioner)
//val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 4ecba03ab5..57429a1532 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
val rdds = new ArrayBuffer[RDD[T]]()
parents.map(_.getOrCompute(validTime)).foreach(_ match {
case Some(rdd) => rdds += rdd
- case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
+ + validTime)
})
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 6301772468..24289b714f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag](
_slideDuration: Duration)
extends DStream[T](parent.ssc) {
- if (!_windowDuration.isMultipleOf(parent.slideDuration))
+ if (!_windowDuration.isMultipleOf(parent.slideDuration)) {
throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ }
- if (!_slideDuration.isMultipleOf(parent.slideDuration))
+ if (!_slideDuration.isMultipleOf(parent.slideDuration)) {
throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
"must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ }
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index b5f11d3440..c7306248b1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)))
- private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
+ private lazy val checkpointWriter =
+ if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+ new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 0d9733fa69..e4fa163f2e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.util.AkkaUtils
private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
+ extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+ extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
+ extends NetworkInputTrackerMessage
/**
* This class manages the execution of the receivers of NetworkInputDStreams. Instance of
@@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}
if (!networkInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+ actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
+ "NetworkInputTracker")
receiverExecutor.start()
logInfo("NetworkInputTracker started")
}
@@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
- logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+ logInfo("Registered receiver for network stream " + streamId + " from "
+ + sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
@@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
})
// Right now, we only honor preferences if all receivers have them
- val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+ val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)
+ .reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
- val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+ val receiversWithPreferences =
+ receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
}
else {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 3063cf10a3..18811fc2b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
private[spark] class StreamingListenerBus() extends Logging {
- private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+ private val listeners = new ArrayBuffer[StreamingListener]()
+ with SynchronizedBuffer[StreamingListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6a45bc2f8a..2bb616cfb8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
}
}
}
- if (!done)
+ if (!done) {
logError("Could not generate file " + hadoopFile)
- else
+ } else {
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ }
Thread.sleep(interval)
localFile.delete()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 179fd75939..2b8cdb72b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
}
} else {
// Calculate how much time we should sleep to bring ourselves to the desired rate.
- // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
- val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+ // Based on throttler in Kafka
+ // scalastyle:off
+ // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+ // scalastyle:on
+ val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs),
+ SECONDS)
if (sleepTime > 0) Thread.sleep(sleepTime)
waitToWrite(numBytes)
}