aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-01-11 21:06:22 -0800
committerReynold Xin <rxin@databricks.com>2016-01-11 21:06:22 -0800
commit39ae04e6b714e085a1341aa84d8fc5fc827d5f35 (patch)
tree98f9bf78a4309c4c4cd061d4ee0a9f4621a5813d
parentaaa2c3b628319178ca1f3f68966ff253c2de49cb (diff)
downloadspark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.gz
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.bz2
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.zip
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space before "," or ":")
Fix the style violation (space before , and :). This PR is a followup for #10643. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #10685 from sarutak/SPARK-12692-followup-streaming.
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala14
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala8
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala18
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala4
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala2
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala4
-rw-r--r--project/MimaExcludes.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala2
30 files changed, 108 insertions, 96 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index ce1a62060e..50216b9bd4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -23,15 +23,15 @@ import java.net.ServerSocket
import java.util.Random
/** Represents a page view on a website with associated dimension data. */
-class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int)
+class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int)
extends Serializable {
- override def toString() : String = {
+ override def toString(): String = {
"%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
}
}
object PageView extends Serializable {
- def fromString(in : String) : PageView = {
+ def fromString(in: String): PageView = {
val parts = in.split("\t")
new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
}
@@ -58,9 +58,9 @@ object PageViewGenerator {
404 -> .05)
val userZipCode = Map(94709 -> .5,
94117 -> .5)
- val userID = Map((1 to 100).map(_ -> .01) : _*)
+ val userID = Map((1 to 100).map(_ -> .01): _*)
- def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
+ def pickFromDistribution[T](inputMap: Map[T, Double]): T = {
val rand = new Random().nextDouble()
var total = 0.0
for ((item, prob) <- inputMap) {
@@ -72,7 +72,7 @@ object PageViewGenerator {
inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
}
- def getNextClickEvent() : String = {
+ def getNextClickEvent(): String = {
val id = pickFromDistribution(userID)
val page = pickFromDistribution(pages)
val status = pickFromDistribution(httpStatus)
@@ -80,7 +80,7 @@ object PageViewGenerator {
new PageView(page, status, zipCode, id).toString()
}
- def main(args : Array[String]) {
+ def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
System.exit(1)
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
index d87b86932d..aa530a7121 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -26,20 +26,20 @@ import org.slf4j.{Logger, LoggerFactory}
private[sink] trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
- @transient private var log_ : Logger = null
+ @transient private var _log: Logger = null
// Method to get or create the logger for this object
protected def log: Logger = {
- if (log_ == null) {
+ if (_log == null) {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
className = className.substring(0, className.length - 1)
}
- log_ = LoggerFactory.getLogger(className)
+ _log = LoggerFactory.getLogger(className)
}
- log_
+ _log
}
// Log methods that take only a String
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 1bfa35a8b3..74bd0165c6 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -41,12 +41,12 @@ import org.apache.spark.util.Utils
private[streaming]
class FlumeInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel, enableDecompression)
@@ -60,7 +60,7 @@ class FlumeInputDStream[T: ClassTag](
* which are not serializable.
*/
class SparkFlumeEvent() extends Externalizable {
- var event : AvroFlumeEvent = new AvroFlumeEvent()
+ var event: AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -75,12 +75,12 @@ class SparkFlumeEvent() extends Externalizable {
val keyLength = in.readInt()
val keyBuff = new Array[Byte](keyLength)
in.readFully(keyBuff)
- val key : String = Utils.deserialize(keyBuff)
+ val key: String = Utils.deserialize(keyBuff)
val valLength = in.readInt()
val valBuff = new Array[Byte](valLength)
in.readFully(valBuff)
- val value : String = Utils.deserialize(valBuff)
+ val value: String = Utils.deserialize(valBuff)
headers.put(key, value)
}
@@ -109,7 +109,7 @@ class SparkFlumeEvent() extends Externalizable {
}
private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
val event = new SparkFlumeEvent
event.event = in
event
@@ -118,13 +118,13 @@ private[streaming] object SparkFlumeEvent {
/** A simple server that implements Flume's Avro protocol. */
private[streaming]
-class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
- override def append(event : AvroFlumeEvent) : Status = {
+class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event: AvroFlumeEvent): Status = {
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
- override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a087474d3..54d8c8b03f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,11 +58,11 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
- ) extends InputDStream[R](ssc_) with Logging {
+ ) extends InputDStream[R](_ssc) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 67f2360896..89d1811c99 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,12 +48,12 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
+ ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 80e2df62de..7b9aee39ff 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -50,7 +50,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
private var ssc: StreamingContext = _
private var tempDirectory: File = null
- override def beforeAll() : Unit = {
+ override def beforeAll(): Unit = {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 116c170489..079bd8a9a8 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class MQTTInputDStream(
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](ssc_) {
+ ) extends ReceiverInputDStream[String](_ssc) {
private[streaming] override def name: String = s"MQTT stream [$id]"
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index a48eec70b9..bdd57fdde3 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,11 +39,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class TwitterInputDStream(
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](_ssc) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 0d5f938d9e..4206d1fada 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -135,6 +135,18 @@ object MimaExcludes {
) ++ Seq(
// SPARK-12510 Refactor ActorReceiver to support Java
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
+ ) ++ Seq(
+ // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":")
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkSink.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log__="),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
)
case v if v.startsWith("1.6") =>
Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 86f01d2168..298cdc05ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -183,7 +183,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
- private var fs_ : FileSystem = _
+ private var _fs: FileSystem = _
@volatile private var latestCheckpointTime: Time = null
@@ -298,12 +298,12 @@ class CheckpointWriter(
}
private def fs = synchronized {
- if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
- fs_
+ if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+ _fs
}
private def reset() = synchronized {
- fs_ = null
+ _fs = null
}
}
@@ -370,8 +370,8 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
- extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader)
+ extends ObjectInputStream(_inputStream) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba509a1030..157ee92fd7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookMan
* of the context by `stop()` or by an exception.
*/
class StreamingContext private[streaming] (
- sc_ : SparkContext,
- cp_ : Checkpoint,
- batchDur_ : Duration
+ _sc: SparkContext,
+ _cp: Checkpoint,
+ _batchDur: Duration
) extends Logging {
/**
@@ -126,18 +126,18 @@ class StreamingContext private[streaming] (
}
- if (sc_ == null && cp_ == null) {
+ if (_sc == null && _cp == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
- private[streaming] val isCheckpointPresent = (cp_ != null)
+ private[streaming] val isCheckpointPresent = (_cp != null)
private[streaming] val sc: SparkContext = {
- if (sc_ != null) {
- sc_
+ if (_sc != null) {
+ _sc
} else if (isCheckpointPresent) {
- SparkContext.getOrCreate(cp_.createSparkConf())
+ SparkContext.getOrCreate(_cp.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
@@ -154,13 +154,13 @@ class StreamingContext private[streaming] (
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
- cp_.graph.setContext(this)
- cp_.graph.restoreCheckpointData()
- cp_.graph
+ _cp.graph.setContext(this)
+ _cp.graph.restoreCheckpointData()
+ _cp.graph
} else {
- require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
+ require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
- newGraph.setBatchDuration(batchDur_)
+ newGraph.setBatchDuration(_batchDur)
newGraph
}
}
@@ -169,15 +169,15 @@ class StreamingContext private[streaming] (
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(cp_.checkpointDir)
- cp_.checkpointDir
+ sc.setCheckpointDir(_cp.checkpointDir)
+ _cp.checkpointDir
} else {
null
}
}
private[streaming] val checkpointDuration: Duration = {
- if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+ if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
}
private[streaming] val scheduler = new JobScheduler(this)
@@ -246,7 +246,7 @@ class StreamingContext private[streaming] (
}
private[streaming] def initialCheckpoint: Checkpoint = {
- if (isCheckpointPresent) cp_ else null
+ if (isCheckpointPresent) _cp else null
}
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
@@ -460,7 +460,7 @@ class StreamingContext private[streaming] (
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
- val conf = sc_.hadoopConfiguration
+ val conf = _sc.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 733147f63e..a791a474c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index 695384deb3..b5f86fe779 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
- extends InputDStream[T](ssc_) {
+class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
+ extends InputDStream[T](_ssc) {
require(rdd != null,
"parameter rdd null is illegal, which will lead to NPE in the following transformation")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 3eff174c2b..a9ce1131ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
- @transient private var fileSystem : FileSystem = null
+ @transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index cb5b1f252e..1c2325409b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -73,13 +73,13 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
conf: Option[Configuration] = None)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- extends InputDStream[(K, V)](ssc_) {
+ extends InputDStream[(K, V)](_ssc) {
private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
@@ -128,8 +128,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
- @transient private var path_ : Path = null
- @transient private var fs_ : FileSystem = null
+ @transient private var _path: Path = null
+ @transient private var _fs: FileSystem = null
override def start() { }
@@ -289,17 +289,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
}
private def directoryPath: Path = {
- if (path_ == null) path_ = new Path(directory)
- path_
+ if (_path == null) _path = new Path(directory)
+ _path
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
- fs_
+ if (_fs == null) _fs = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
+ _fs
}
private def reset() {
- fs_ = null
+ _fs = null
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index d60f418e5c..76f6230f36 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -38,10 +38,10 @@ import org.apache.spark.util.Utils
* that requires running a receiver on the worker nodes, use
* [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
+abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+ extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 2442e4c01a..e003ddb96c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -24,8 +24,8 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
- receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
+ _ssc: StreamingContext,
+ receiver: Receiver[T]) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
receiver
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index ac73dca05a..409c565380 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
+ ) extends ReceiverInputDStream[T](_ssc) with Logging {
def getReceiver(): Receiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 565b137228..49d8f14f4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -35,11 +35,11 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
* define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
- extends InputDStream[T](ssc_) {
+abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
+ extends InputDStream[T](_ssc) {
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e70fc87c39..4414774791 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -31,12 +31,12 @@ import org.apache.spark.util.NextIterator
private[streaming]
class SocketInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_) {
+ ) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebbe139a2c..fedffb2395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
- initialRDD : Option[RDD[(K, S)]]
+ initialRDD: Option[RDD[(K, S)]]
) extends DStream[(K, S)](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -43,7 +43,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
override val mustCheckpoint = true
private [this] def computeUsingPreviousRDD (
- parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
+ parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
@@ -98,7 +98,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
+ val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 639f4259e2..3376cd557d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -108,7 +108,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
def onStop()
/** Override this to specify a preferred location (hostname). */
- def preferredLocation : Option[String] = None
+ def preferredLocation: Option[String] = None
/**
* Store a single item of received data to Spark's memory.
@@ -257,11 +257,11 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
- @transient private var _supervisor : ReceiverSupervisor = null
+ @transient private var _supervisor: ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
- private[streaming] def setReceiverId(id_ : Int) {
- id = id_
+ private[streaming] def setReceiverId(_id: Int) {
+ id = _id
}
/** Attach Network Receiver executor to this receiver. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9d296c6d3e..25e7ae8262 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -186,7 +186,7 @@ class BasicOperationsSuite extends TestSuiteBase {
val output = Seq(1 to 8, 101 to 108, 201 to 208)
testOperation(
input,
- (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+ (s: DStream[Int]) => s.union(s.map(_ + 4)),
output
)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4d04138da0..4a6b91fbc7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, ManualClock, MutableURLClassLoader, ResetSy
* A input stream that records the times of restore() invoked
*/
private[streaming]
-class CheckpointInputDStream(ssc_ : StreamingContext) extends InputDStream[Int](ssc_) {
+class CheckpointInputDStream(_ssc: StreamingContext) extends InputDStream[Int](_ssc) {
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
override def start(): Unit = { }
override def stop(): Unit = { }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 4e56dfbd42..7bbbdebd9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -200,12 +200,12 @@ object MasterFailureTest extends Logging {
* the last expected output is generated. Finally, return
*/
private def runStreams[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
): Seq[T] = {
- var ssc = ssc_
+ var ssc = _ssc
var totalTimeRan = 0L
var isLastOutputGenerated = false
var isTimedOut = false
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
index da0430e263..7a76cafc9a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StateMapSuite.scala
@@ -280,7 +280,7 @@ class StateMapSuite extends SparkFunSuite {
testSerialization(new KryoSerializer(conf), map, msg)
}
- private def testSerialization[T : ClassTag](
+ private def testSerialization[T: ClassTag](
serializer: Serializer,
map: OpenHashMapBasedStateMap[T, T],
msg: String): OpenHashMapBasedStateMap[T, T] = {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 0ae4c45988..197b3d1439 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -896,7 +896,7 @@ object SlowTestReceiver {
package object testPackage extends Assertions {
def test() {
val conf = new SparkConf().setMaster("local").setAppName("CreationSite test")
- val ssc = new StreamingContext(conf , Milliseconds(100))
+ val ssc = new StreamingContext(conf, Milliseconds(100))
try {
val inputStream = ssc.receiverStream(new TestReceiver)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 54eff2b214..239b10894a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -58,8 +58,8 @@ private[streaming] class DummyInputDStream(ssc: StreamingContext) extends InputD
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
- extends InputDStream[T](ssc_) {
+class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+ extends InputDStream[T](_ssc) {
def start() {}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index 3bd8d086ab..b67189fbd7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -107,8 +107,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
/** An input DStream with for testing rate controlling */
-private[streaming] class RateTestInputDStream(@transient ssc_ : StreamingContext)
- extends ReceiverInputDStream[Int](ssc_) {
+private[streaming] class RateTestInputDStream(@transient _ssc: StreamingContext)
+ extends ReceiverInputDStream[Int](_ssc) {
override def getReceiver(): Receiver[Int] = new RateTestReceiver(id)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index b5d6a24ce8..734dd93cda 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -154,7 +154,7 @@ abstract class CommonWriteAheadLogTests(
// Recover old files and generate a second set of log files
val dataToWrite2 = generateRandomData()
manualClock.advance(100000)
- writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, allowBatching ,
+ writeDataUsingWriteAheadLog(testDir, dataToWrite2, closeFileAfterWrite, allowBatching,
manualClock)
val logFiles2 = getLogFilesInDirectory(testDir)
assert(logFiles2.size > logFiles1.size)