aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-28 17:55:22 -0700
committerReynold Xin <rxin@databricks.com>2015-05-28 17:55:22 -0700
commit3af0b3136e4b7dea52c413d640653ccddc638574 (patch)
tree4963a880985f4833f306f34d702ffbc0f1eeec3b /streaming
parent1bd63e82fdb6ee57c61051430d63685b801df016 (diff)
downloadspark-3af0b3136e4b7dea52c413d640653ccddc638574.tar.gz
spark-3af0b3136e4b7dea52c413d640653ccddc638574.tar.bz2
spark-3af0b3136e4b7dea52c413d640653ccddc638574.zip
[SPARK-7927] whitespace fixes for streaming.
So we can enable a whitespace enforcement rule in the style checker to save code review time. Author: Reynold Xin <rxin@databricks.com> Closes #6475 from rxin/whitespace-streaming and squashes the following commits: 810dae4 [Reynold Xin] Fixed tests. 89068ad [Reynold Xin] [SPARK-7927] whitespace fixes for streaming.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala2
19 files changed, 40 insertions, 35 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 5e58ed7148..25842d5025 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -461,7 +461,7 @@ class StreamingContext private[streaming] (
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
- directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
+ directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
require(bytes.length == recordLength, "Byte array does not have correct length. " +
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 93baad19e3..959ac9c177 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -227,7 +227,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- :JavaPairDStream[K, JIterable[V]] = {
+ : JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
.mapValues(asJavaIterable _)
}
@@ -247,7 +247,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ):JavaPairDStream[K, JIterable[V]] = {
+ ): JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
.mapValues(asJavaIterable _)
}
@@ -262,7 +262,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* batching interval
*/
def reduceByKeyAndWindow(reduceFunc: JFunction2[V, V, V], windowDuration: Duration)
- :JavaPairDStream[K, V] = {
+ : JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
@@ -281,7 +281,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
- ):JavaPairDStream[K, V] = {
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index c858647c64..6efcc193bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -659,7 +659,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
- val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index eca69f0018..6c1fab5674 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -69,7 +69,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* processing semantics are undefined.
*/
private[streaming]
-class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
+class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
@@ -251,7 +251,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
- val fileRDDs = files.map(file =>{
+ val fileRDDs = files.map { file =>
val rdd = serializableConfOpt.map(_.value) match {
case Some(config) => context.sparkContext.newAPIHadoopFile(
file,
@@ -267,7 +267,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
"Refer to the streaming programming guide for more details.")
}
rdd
- })
+ }
new UnionRDD(context.sparkContext, fileRDDs)
}
@@ -294,7 +294,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
- generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
+ generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fda22eb6ec..358e4c66df 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.StreamingContext.rddToFileName
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
*/
-class PairDStreamFunctions[K, V](self: DStream[(K,V)])
+class PairDStreamFunctions[K, V](self: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
extends Serializable
{
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index df9f7f140e..6a583bf2a3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -38,7 +38,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
- ) extends DStream[(K,V)](parent.ssc) {
+ ) extends DStream[(K, V)](parent.ssc) {
require(_windowDuration.isMultipleOf(parent.slideDuration),
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
@@ -58,7 +58,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
super.persist(StorageLevel.MEMORY_ONLY_SER)
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowDuration: Duration = _windowDuration
+ def windowDuration: Duration = _windowDuration
override def dependencies: List[DStream[_]] = List(reducedStream)
@@ -68,7 +68,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
override def parentRememberDuration: Duration = rememberDuration + windowDuration
- override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
+ override def persist(storageLevel: StorageLevel): DStream[(K, V)] = {
super.persist(storageLevel)
reducedStream.persist(storageLevel)
this
@@ -118,7 +118,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
// Get the RDD of the reduced value of the previous window
val previousWindowRDD =
- getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+ getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 7757ccac09..e0ffd5d86b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -25,19 +25,19 @@ import scala.reflect.ClassTag
private[streaming]
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
- parent: DStream[(K,V)],
+ parent: DStream[(K, V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true
- ) extends DStream[(K,C)] (parent.ssc) {
+ ) extends DStream[(K, C)] (parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
- override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ override def compute(validTime: Time): Option[RDD[(K, C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index de8718d0a8..621d6dff78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -51,7 +51,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map(t => {
val itr = t._2._2.iterator
- val headOption = if(itr.hasNext) Some(itr.next) else None
+ val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
})
updateFuncLocal(i)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 899865a906..4efba039f8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -44,7 +44,7 @@ class WindowedDStream[T: ClassTag](
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
- def windowDuration: Duration = _windowDuration
+ def windowDuration: Duration = _windowDuration
override def dependencies: List[DStream[_]] = List(parent)
@@ -68,7 +68,7 @@ class WindowedDStream[T: ClassTag](
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
logDebug("Using normal union for windowing at " + validTime)
- new UnionRDD(ssc.sc,rddsInWindow)
+ new UnionRDD(ssc.sc, rddsInWindow)
}
Some(windowRDD)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 4bebcc5aa7..0588517a2d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -164,7 +164,7 @@ private[streaming] class BlockGenerator(
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
try {
- while(!stopped) {
+ while (!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index 97db9ded83..8df542b367 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -17,8 +17,9 @@
package org.apache.spark.streaming.receiver
+import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
+
import org.apache.spark.{Logging, SparkConf}
-import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
/** Provides waitToPush() method to limit the rate at which receivers consume data.
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f73f7e705e..f1504b09c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -230,7 +230,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
class ReceiverLauncher {
@transient val env = ssc.env
@volatile @transient private var running = false
- @transient val thread = new Thread() {
+ @transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 87ba4f84a9..fe6328b1ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -200,7 +200,7 @@ private[streaming] class FileBasedWriteAheadLog(
/** Initialize the log directory or recover existing logs inside the directory */
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
- val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 4d968f8bfa..408936653c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -27,7 +27,7 @@ object RawTextHelper {
* Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- val map = new OpenHashMap[String,Long]
+ val map = new OpenHashMap[String, Long]
var i = 0
var j = 0
while (iter.hasNext) {
@@ -98,7 +98,7 @@ object RawTextHelper {
* before real workload starts.
*/
def warmUp(sc: SparkContext) {
- for(i <- 0 to 1) {
+ for (i <- 0 to 1) {
sc.parallelize(1 to 200000, 1000)
.map(_ % 1331).map(_.toString)
.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index f269cb74e0..08faeaa58f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -255,7 +255,7 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
+ s1.map(x => (x, 1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
@@ -427,9 +427,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("updateStateByKey - object lifecycle") {
val inputData =
Seq(
- Seq("a","b"),
+ Seq("a", "b"),
null,
- Seq("a","c","a"),
+ Seq("a", "c", "a"),
Seq("c"),
null,
null
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 0122514f93..b74d67c63a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -418,7 +418,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
val servingThread = new Thread() {
override def run() {
try {
- while(true) {
+ while (true) {
logInfo("Accepting connections on port " + port)
val clientSocket = serverSocket.accept()
if (startLatch.getCount == 1) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f8e8030791..e36c7914b1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -732,7 +732,9 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
def onStop() {
// Simulate slow receiver by waiting for all records to be produced
- while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
+ while (!SlowTestReceiver.receivedAllRecords) {
+ Thread.sleep(100)
+ }
// no clean to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 312cce408c..1dc8960d60 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -133,8 +133,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
- for(i <- 1 until seq.size) {
- if (seq(i - 1) > seq(i)) return false
+ for (i <- 1 until seq.size) {
+ if (seq(i - 1) > seq(i)) {
+ return false
+ }
}
true
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 2a0f45830e..c9175d61b1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -64,7 +64,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (0)
// onBatchStarted
- val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
+ val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))