aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-01-11 21:06:22 -0800
committerReynold Xin <rxin@databricks.com>2016-01-11 21:06:22 -0800
commit39ae04e6b714e085a1341aa84d8fc5fc827d5f35 (patch)
tree98f9bf78a4309c4c4cd061d4ee0a9f4621a5813d /streaming
parentaaa2c3b628319178ca1f3f68966ff253c2de49cb (diff)
downloadspark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.gz
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.bz2
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.zip
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space before "," or ":")
Fix the style violation (space before , and :). This PR is a followup for #10643. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #10685 from sarutak/SPARK-12692-followup-streaming.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala2
21 files changed, 67 insertions, 67 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 86f01d2168..298cdc05ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -183,7 +183,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
- private var fs_ : FileSystem = _
+ private var _fs: FileSystem = _
@volatile private var latestCheckpointTime: Time = null
@@ -298,12 +298,12 @@ class CheckpointWriter(
}
private def fs = synchronized {
- if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
- fs_
+ if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+ _fs
}
private def reset() = synchronized {
- fs_ = null
+ _fs = null
}
}
@@ -370,8 +370,8 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
- extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader)
+ extends ObjectInputStream(_inputStream) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba509a1030..157ee92fd7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookMan
* of the context by `stop()` or by an exception.
*/
class StreamingContext private[streaming] (
- sc_ : SparkContext,
- cp_ : Checkpoint,
- batchDur_ : Duration
+ _sc: SparkContext,
+ _cp: Checkpoint,
+ _batchDur: Duration
) extends Logging {
/**
@@ -126,18 +126,18 @@ class StreamingContext private[streaming] (
}
- if (sc_ == null && cp_ == null) {
+ if (_sc == null && _cp == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
- private[streaming] val isCheckpointPresent = (cp_ != null)
+ private[streaming] val isCheckpointPresent = (_cp != null)
private[streaming] val sc: SparkContext = {
- if (sc_ != null) {
- sc_
+ if (_sc != null) {
+ _sc
} else if (isCheckpointPresent) {
- SparkContext.getOrCreate(cp_.createSparkConf())
+ SparkContext.getOrCreate(_cp.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
@@ -154,13 +154,13 @@ class StreamingContext private[streaming] (
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
- cp_.graph.setContext(this)
- cp_.graph.restoreCheckpointData()
- cp_.graph
+ _cp.graph.setContext(this)
+ _cp.graph.restoreCheckpointData()
+ _cp.graph
} else {
- require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
+ require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
- newGraph.setBatchDuration(batchDur_)
+ newGraph.setBatchDuration(_batchDur)
newGraph
}
}
@@ -169,15 +169,15 @@ class StreamingContext private[streaming] (
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(cp_.checkpointDir)
- cp_.checkpointDir
+ sc.setCheckpointDir(_cp.checkpointDir)
+ _cp.checkpointDir
} else {
null
}
}
private[streaming] val checkpointDuration: Duration = {
- if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+ if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
}
private[streaming] val scheduler = new JobScheduler(this)
@@ -246,7 +246,7 @@ class StreamingContext private[streaming] (
}
private[streaming] def initialCheckpoint: Checkpoint = {
- if (isCheckpointPresent) cp_ else null
+ if (isCheckpointPresent) _cp else null
}
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
@@ -460,7 +460,7 @@ class StreamingContext private[streaming] (
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
- val conf = sc_.hadoopConfiguration
+ val conf = _sc.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 733147f63e..a791a474c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index 695384deb3..b5f86fe779 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
- extends InputDStream[T](ssc_) {
+class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
+ extends InputDStream[T](_ssc) {
require(rdd != null,
"parameter rdd null is illegal, which will lead to NPE in the following transformation")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 3eff174c2b..a9ce1131ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
- @transient private var fileSystem : FileSystem = null
+ @transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index cb5b1f252e..1c2325409b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -73,13 +73,13 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
conf: Option[Configuration] = None)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- extends InputDStream[(K, V)](ssc_) {
+ extends InputDStream[(K, V)](_ssc) {
private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
@@ -128,8 +128,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
- @transient private var path_ : Path = null
- @transient private var fs_ : FileSystem = null
+ @transient private var _path: Path = null
+ @transient private var _fs: FileSystem = null
override def start() { }
@@ -289,17 +289,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
}
private def directoryPath: Path = {
- if (path_ == null) path_ = new Path(directory)
- path_
+ if (_path == null) _path = new Path(directory)
+ _path
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
- fs_
+ if (_fs == null) _fs = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
+ _fs
}
private def reset() {
- fs_ = null
+ _fs = null
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index d60f418e5c..76f6230f36 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -38,10 +38,10 @@ import org.apache.spark.util.Utils
* that requires running a receiver on the worker nodes, use
* [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
+abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+ extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 2442e4c01a..e003ddb96c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -24,8 +24,8 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
- receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
+ _ssc: StreamingContext,
+ receiver: Receiver[T]) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
receiver
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index ac73dca05a..409c565380 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
+ ) extends ReceiverInputDStream[T](_ssc) with Logging {
def getReceiver(): Receiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 565b137228..49d8f14f4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -35,11 +35,11 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
* define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
- extends InputDStream[T](ssc_) {
+abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
+ extends InputDStream[T](_ssc) {
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e70fc87c39..4414774791 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -31,12 +31,12 @@ import org.apache.spark.util.NextIterator
private[streaming]
class SocketInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_) {
+ ) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebbe139a2c..fedffb2395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
- initialRDD : Option[RDD[(K, S)]]
+ initialRDD: Option[RDD[(K, S)]]
) extends DStream[(K, S)](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -43,7 +43,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
override val mustCheckpoint = true
private [this] def computeUsingPreviousRDD (
- parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
+ parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
@@ -98,7 +98,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
+ val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 639f4259e2..3376cd557d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -108,7 +108,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
def onStop()
/** Override this to specify a preferred location (hostname). */
- def preferredLocation : Option[String] = None
+ def preferredLocation: Option[String] = None
/**
* Store a single item of received data to Spark's memory.
@@ -257,11 +257,11 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
- @transient private var _supervisor : ReceiverSupervisor = null
+ @transient private var _supervisor: ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
- private[streaming] def setReceiverId(id_ : Int) {
- id = id_
+ private[streaming] def setReceiverId(_id: Int) {
+ id = _id
}
/** Attach Network Receiver executor to this receiver. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9d296c6d3e..25e7ae8262 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -186,7 +186,7 @@ class BasicOperationsSuite extends TestSuiteBase {
val output = Seq(1 to 8, 101 to 108, 201 to 208)
testOperation(
input,
- (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+ (s: DStream[Int]) => s.union(s.map(_ + 4)),
output
)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4d04138da0..4a6b91fbc7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSy
* A input stream that records the times of restore() invoked
*/
private[streaming]
-class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) {
+class CheckpointInputDStream(_ssc: StreamingContext) extends InputDStream[Int](_ssc) {
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
override def start(): Unit = { }
override def stop(): Unit = { }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 4e56dfbd42..7bbbdebd9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -200,12 +200,12 @@ object MasterFailureTest extends Logging {
* the last expected output is generated. Finally, return
*/
private def runStreams[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
): Seq[T] = {
- var ssc = ssc_
+ var ssc = _ssc
var totalTimeRan = 0L
var isLastOutputGenerated = false
var isTimedOut = false
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
index da0430e263..7a76cafc9a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
@@ -280,7 +280,7 @@ class StateMapSuite extends SparkFunSuite {
testSerialization(new KryoSerializer(conf), map, msg)
}
- private def testSerialization[T : ClassTag](
+ private def testSerialization[T: ClassTag](
serializer: Serializer,
map: OpenHashMapBasedStateMap[T, T],
msg: String): OpenHashMapBasedStateMap[T, T] = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 0ae4c45988..197b3d1439 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -896,7 +896,7 @@ object SlowTestReceiver {
package object testPackage extends Assertions {
def test() {
val conf = new SparkConf().setMaster("local").setAppName("CreationSite test")
- val ssc = new StreamingContext(conf , Milliseconds(100))
+ val ssc = new StreamingContext(conf, Milliseconds(100))
try {
val inputStream = ssc.receiverStream(new TestReceiver)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 54eff2b214..239b10894a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -58,8 +58,8 @@ private[streaming] class DummyInputDStream(ssc: StreamingContext) extends InputD
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
- extends InputDStream[T](ssc_) {
+class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+ extends InputDStream[T](_ssc) {
def start() {}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 3bd8d086ab..b67189fbd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -107,8 +107,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
/** An input DStream with for testing rate controlling */
-private[streaming] class RateTestInputDStream(@transient ssc_ : StreamingContext)
- extends ReceiverInputDStream[Int](ssc_) {
+private[streaming] class RateTestInputDStream(@transient _ssc: StreamingContext)
+ extends ReceiverInputDStream[Int](_ssc) {
override def getReceiver(): Receiver[Int] = new RateTestReceiver(id)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index b5d6a24ce8..734dd93cda 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -154,7 +154,7 @@ abstract class CommonWriteAheadLogTests(
// Recover old files and generate a second set of log files
val dataToWrite2 = generateRandomData()
manualClock.advance(100000)
- writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, allowBatching ,
+ writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, allowBatching,
manualClock)
val logFiles2 = getLogFilesInDirectory(testDir)
assert(logFiles2.size > logFiles1.size)