diff options
Diffstat (limited to 'streaming/src/main/scala/org')
13 files changed, 56 insertions, 56 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 86f01d2168..298cdc05ac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -183,7 +183,7 @@ class CheckpointWriter( val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false - private var fs_ : FileSystem = _ + private var _fs: FileSystem = _ @volatile private var latestCheckpointTime: Time = null @@ -298,12 +298,12 @@ class CheckpointWriter( } private def fs = synchronized { - if (fs_ == null) fs_ = new Path(checkpointDir).getFileSystem(hadoopConf) - fs_ + if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf) + _fs } private def reset() = synchronized { - fs_ = null + _fs = null } } @@ -370,8 +370,8 @@ object CheckpointReader extends Logging { } private[streaming] -class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) - extends ObjectInputStream(inputStream_) { +class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader) + extends ObjectInputStream(_inputStream) { override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ba509a1030..157ee92fd7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -58,9 +58,9 @@ import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookMan * of the context by `stop()` or by an exception. */ class StreamingContext private[streaming] ( - sc_ : SparkContext, - cp_ : Checkpoint, - batchDur_ : Duration + _sc: SparkContext, + _cp: Checkpoint, + _batchDur: Duration ) extends Logging { /** @@ -126,18 +126,18 @@ class StreamingContext private[streaming] ( } - if (sc_ == null && cp_ == null) { + if (_sc == null && _cp == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") } - private[streaming] val isCheckpointPresent = (cp_ != null) + private[streaming] val isCheckpointPresent = (_cp != null) private[streaming] val sc: SparkContext = { - if (sc_ != null) { - sc_ + if (_sc != null) { + _sc } else if (isCheckpointPresent) { - SparkContext.getOrCreate(cp_.createSparkConf()) + SparkContext.getOrCreate(_cp.createSparkConf()) } else { throw new SparkException("Cannot create StreamingContext without a SparkContext") } @@ -154,13 +154,13 @@ class StreamingContext private[streaming] ( private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { - cp_.graph.setContext(this) - cp_.graph.restoreCheckpointData() - cp_.graph + _cp.graph.setContext(this) + _cp.graph.restoreCheckpointData() + _cp.graph } else { - require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") + require(_batchDur != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() - newGraph.setBatchDuration(batchDur_) + newGraph.setBatchDuration(_batchDur) newGraph } } @@ -169,15 +169,15 @@ class StreamingContext private[streaming] ( private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(cp_.checkpointDir) - cp_.checkpointDir + sc.setCheckpointDir(_cp.checkpointDir) + _cp.checkpointDir } else { null } } private[streaming] val checkpointDuration: Duration = { - if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration + if (isCheckpointPresent) _cp.checkpointDuration else graph.batchDuration } private[streaming] val scheduler = new JobScheduler(this) @@ -246,7 +246,7 @@ class StreamingContext private[streaming] ( } private[streaming] def initialCheckpoint: Checkpoint = { - if (isCheckpointPresent) cp_ else null + if (isCheckpointPresent) _cp else null } private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement() @@ -460,7 +460,7 @@ class StreamingContext private[streaming] ( def binaryRecordsStream( directory: String, recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { - val conf = sc_.hadoopConfiguration + val conf = _sc.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 733147f63e..a791a474c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * of elements in a window over this DStream. windowDuration and slideDuration are as defined in * the window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = { + def countByWindow(windowDuration: Duration, slideDuration: Duration): JavaDStream[jl.Long] = { dstream.countByWindow(windowDuration, slideDuration) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala index 695384deb3..b5f86fe779 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -25,8 +25,8 @@ import org.apache.spark.streaming.{StreamingContext, Time} /** * An input stream that always returns the same RDD on each timestep. Useful for testing. */ -class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T]) - extends InputDStream[T](ssc_) { +class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T]) + extends InputDStream[T](_ssc) { require(rdd != null, "parameter rdd null is illegal, which will lead to NPE in the following transformation") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 3eff174c2b..a9ce1131ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -39,7 +39,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] - @transient private var fileSystem : FileSystem = null + @transient private var fileSystem: FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index cb5b1f252e..1c2325409b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -73,13 +73,13 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti */ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( - ssc_ : StreamingContext, + _ssc: StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) - extends InputDStream[(K, V)](ssc_) { + extends InputDStream[(K, V)](_ssc) { private val serializableConfOpt = conf.map(new SerializableConfiguration(_)) @@ -128,8 +128,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Timestamp of the last round of finding files @transient private var lastNewFileFindingTime = 0L - @transient private var path_ : Path = null - @transient private var fs_ : FileSystem = null + @transient private var _path: Path = null + @transient private var _fs: FileSystem = null override def start() { } @@ -289,17 +289,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( } private def directoryPath: Path = { - if (path_ == null) path_ = new Path(directory) - path_ + if (_path == null) _path = new Path(directory) + _path } private def fs: FileSystem = { - if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) - fs_ + if (_fs == null) _fs = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) + _fs } private def reset() { - fs_ = null + _fs = null } @throws(classOf[IOException]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index d60f418e5c..76f6230f36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -38,10 +38,10 @@ import org.apache.spark.util.Utils * that requires running a receiver on the worker nodes, use * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class. * - * @param ssc_ Streaming context that will execute this input stream + * @param _ssc Streaming context that will execute this input stream */ -abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext) - extends DStream[T](ssc_) { +abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext) + extends DStream[T](_ssc) { private[streaming] var lastValidTime: Time = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala index 2442e4c01a..e003ddb96c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -24,8 +24,8 @@ import org.apache.spark.streaming.receiver.Receiver private[streaming] class PluggableInputDStream[T: ClassTag]( - ssc_ : StreamingContext, - receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) { + _ssc: StreamingContext, + receiver: Receiver[T]) extends ReceiverInputDStream[T](_ssc) { def getReceiver(): Receiver[T] = { receiver diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index ac73dca05a..409c565380 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -38,11 +38,11 @@ import org.apache.spark.streaming.receiver.Receiver */ private[streaming] class RawInputDStream[T: ClassTag]( - ssc_ : StreamingContext, + _ssc: StreamingContext, host: String, port: Int, storageLevel: StorageLevel - ) extends ReceiverInputDStream[T](ssc_ ) with Logging { + ) extends ReceiverInputDStream[T](_ssc) with Logging { def getReceiver(): Receiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[Receiver[T]] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 565b137228..49d8f14f4c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -35,11 +35,11 @@ import org.apache.spark.streaming.util.WriteAheadLogUtils * define [[getReceiver]] function that gets the receiver object of type * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent * to the workers to receive data. - * @param ssc_ Streaming context that will execute this input stream + * @param _ssc Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */ -abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext) - extends InputDStream[T](ssc_) { +abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext) + extends InputDStream[T](_ssc) { /** * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index e70fc87c39..4414774791 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -31,12 +31,12 @@ import org.apache.spark.util.NextIterator private[streaming] class SocketInputDStream[T: ClassTag]( - ssc_ : StreamingContext, + _ssc: StreamingContext, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel - ) extends ReceiverInputDStream[T](ssc_) { + ) extends ReceiverInputDStream[T](_ssc) { def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index ebbe139a2c..fedffb2395 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -31,7 +31,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, preservePartitioning: Boolean, - initialRDD : Option[RDD[(K, S)]] + initialRDD: Option[RDD[(K, S)]] ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER) @@ -43,7 +43,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( override val mustCheckpoint = true private [this] def computeUsingPreviousRDD ( - parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = { + parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)]) = { // Define the function for the mapPartition operation on cogrouped RDD; // first map the cogrouped tuple to tuples of required type, // and then apply the update function @@ -98,7 +98,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // first map the grouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc - val finalFunc = (iterator : Iterator[(K, Iterable[V])]) => { + val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => { updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None))) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala index 639f4259e2..3376cd557d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala @@ -108,7 +108,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable def onStop() /** Override this to specify a preferred location (hostname). */ - def preferredLocation : Option[String] = None + def preferredLocation: Option[String] = None /** * Store a single item of received data to Spark's memory. @@ -257,11 +257,11 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable private var id: Int = -1 /** Handler object that runs the receiver. This is instantiated lazily in the worker. */ - @transient private var _supervisor : ReceiverSupervisor = null + @transient private var _supervisor: ReceiverSupervisor = null /** Set the ID of the DStream that this receiver is associated with. */ - private[streaming] def setReceiverId(id_ : Int) { - id = id_ + private[streaming] def setReceiverId(_id: Int) { + id = _id } /** Attach Network Receiver executor to this receiver. */ |