aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2016-01-11 21:06:22 -0800
committerReynold Xin <rxin@databricks.com>2016-01-11 21:06:22 -0800
commit39ae04e6b714e085a1341aa84d8fc5fc827d5f35 (patch)
tree98f9bf78a4309c4c4cd061d4ee0a9f4621a5813d /streaming/src/main
parentaaa2c3b628319178ca1f3f68966ff253c2de49cb (diff)
downloadspark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.gz
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.tar.bz2
spark-39ae04e6b714e085a1341aa84d8fc5fc827d5f35.zip
[SPARK-12692][BUILD][STREAMING] Scala style: Fix the style violation (Space before "," or ":")
Fix the style violation (space before , and :). This PR is a followup for #10643. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #10685 from sarutak/SPARK-12692-followup-streaming.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala8
13 files changed, 56 insertions, 56 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 86f01d2168..298cdc05ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -183,7 +183,7 @@ class CheckpointWriter(
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
- private var fs_ : FileSystem = _
+ private var _fs: FileSystem = _
@volatile private var latestCheckpointTime: Time = null
@@ -298,12 +298,12 @@ class CheckpointWriter(
}
private def fs = synchronized {
- if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf)
- fs_
+ if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+ _fs
}
private def reset() = synchronized {
- fs_ = null
+ _fs = null
}
}
@@ -370,8 +370,8 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
- extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader)
+ extends ObjectInputStream(_inputStream) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba509a1030..157ee92fd7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookMan
* of the context by `stop()` or by an exception.
*/
class StreamingContext private[streaming] (
- sc_ : SparkContext,
- cp_ : Checkpoint,
- batchDur_ : Duration
+ _sc: SparkContext,
+ _cp: Checkpoint,
+ _batchDur: Duration
) extends Logging {
/**
@@ -126,18 +126,18 @@ class StreamingContext private[streaming] (
}
- if (sc_ == null && cp_ == null) {
+ if (_sc == null && _cp == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
- private[streaming] val isCheckpointPresent = (cp_ != null)
+ private[streaming] val isCheckpointPresent = (_cp != null)
private[streaming] val sc: SparkContext = {
- if (sc_ != null) {
- sc_
+ if (_sc != null) {
+ _sc
} else if (isCheckpointPresent) {
- SparkContext.getOrCreate(cp_.createSparkConf())
+ SparkContext.getOrCreate(_cp.createSparkConf())
} else {
throw new SparkException("Cannot create StreamingContext without a SparkContext")
}
@@ -154,13 +154,13 @@ class StreamingContext private[streaming] (
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
- cp_.graph.setContext(this)
- cp_.graph.restoreCheckpointData()
- cp_.graph
+ _cp.graph.setContext(this)
+ _cp.graph.restoreCheckpointData()
+ _cp.graph
} else {
- require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
+ require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
- newGraph.setBatchDuration(batchDur_)
+ newGraph.setBatchDuration(_batchDur)
newGraph
}
}
@@ -169,15 +169,15 @@ class StreamingContext private[streaming] (
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(cp_.checkpointDir)
- cp_.checkpointDir
+ sc.setCheckpointDir(_cp.checkpointDir)
+ _cp.checkpointDir
} else {
null
}
}
private[streaming] val checkpointDuration: Duration = {
- if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
+ if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration
}
private[streaming] val scheduler = new JobScheduler(this)
@@ -246,7 +246,7 @@ class StreamingContext private[streaming] (
}
private[streaming] def initialCheckpoint: Checkpoint = {
- if (isCheckpointPresent) cp_ else null
+ if (isCheckpointPresent) _cp else null
}
private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
@@ -460,7 +460,7 @@ class StreamingContext private[streaming] (
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
- val conf = sc_.hadoopConfiguration
+ val conf = _sc.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 733147f63e..a791a474c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index 695384deb3..b5f86fe779 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
-class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
- extends InputDStream[T](ssc_) {
+class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
+ extends InputDStream[T](_ssc) {
require(rdd != null,
"parameter rdd null is illegal, which will lead to NPE in the following transformation")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 3eff174c2b..a9ce1131ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
- @transient private var fileSystem : FileSystem = null
+ @transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index cb5b1f252e..1c2325409b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -73,13 +73,13 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
conf: Option[Configuration] = None)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- extends InputDStream[(K, V)](ssc_) {
+ extends InputDStream[(K, V)](_ssc) {
private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
@@ -128,8 +128,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
- @transient private var path_ : Path = null
- @transient private var fs_ : FileSystem = null
+ @transient private var _path: Path = null
+ @transient private var _fs: FileSystem = null
override def start() { }
@@ -289,17 +289,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
}
private def directoryPath: Path = {
- if (path_ == null) path_ = new Path(directory)
- path_
+ if (_path == null) _path = new Path(directory)
+ _path
}
private def fs: FileSystem = {
- if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
- fs_
+ if (_fs == null) _fs = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
+ _fs
}
private def reset() {
- fs_ = null
+ _fs = null
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index d60f418e5c..76f6230f36 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -38,10 +38,10 @@ import org.apache.spark.util.Utils
* that requires running a receiver on the worker nodes, use
* [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
*
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
- extends DStream[T](ssc_) {
+abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+ extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 2442e4c01a..e003ddb96c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -24,8 +24,8 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
- receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
+ _ssc: StreamingContext,
+ receiver: Receiver[T]) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
receiver
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index ac73dca05a..409c565380 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_ ) with Logging {
+ ) extends ReceiverInputDStream[T](_ssc) with Logging {
def getReceiver(): Receiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 565b137228..49d8f14f4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -35,11 +35,11 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils
* define [[getReceiver]] function that gets the receiver object of type
* [[org.apache.spark.streaming.receiver.Receiver]] that will be sent
* to the workers to receive data.
- * @param ssc_ Streaming context that will execute this input stream
+ * @param _ssc Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
- extends InputDStream[T](ssc_) {
+abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
+ extends InputDStream[T](_ssc) {
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index e70fc87c39..4414774791 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -31,12 +31,12 @@ import org.apache.spark.util.NextIterator
private[streaming]
class SocketInputDStream[T: ClassTag](
- ssc_ : StreamingContext,
+ _ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends ReceiverInputDStream[T](ssc_) {
+ ) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebbe139a2c..fedffb2395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
preservePartitioning: Boolean,
- initialRDD : Option[RDD[(K, S)]]
+ initialRDD: Option[RDD[(K, S)]]
) extends DStream[(K, S)](parent.ssc) {
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -43,7 +43,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
override val mustCheckpoint = true
private [this] def computeUsingPreviousRDD (
- parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
+ parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
@@ -98,7 +98,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => {
+ val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 639f4259e2..3376cd557d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -108,7 +108,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
def onStop()
/** Override this to specify a preferred location (hostname). */
- def preferredLocation : Option[String] = None
+ def preferredLocation: Option[String] = None
/**
* Store a single item of received data to Spark's memory.
@@ -257,11 +257,11 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
private var id: Int = -1
/** Handler object that runs the receiver. This is instantiated lazily in the worker. */
- @transient private var _supervisor : ReceiverSupervisor = null
+ @transient private var _supervisor: ReceiverSupervisor = null
/** Set the ID of the DStream that this receiver is associated with. */
- private[streaming] def setReceiverId(id_ : Int) {
- id = id_
+ private[streaming] def setReceiverId(_id: Int) {
+ id = _id
}
/** Attach Network Receiver executor to this receiver. */