aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-03-24 17:08:25 -0700
committerReynold Xin <rxin@databricks.com>2015-03-24 17:08:25 -0700
commit94598653bc772e71709163db3fed4048aa7f5f75 (patch)
treea4285c73d7a197909039e31a2f3097f3c8968b36 /streaming
parent6930e965e26d39fa6c26ae67a08b4c4d0368d556 (diff)
downloadspark-94598653bc772e71709163db3fed4048aa7f5f75.tar.gz
spark-94598653bc772e71709163db3fed4048aa7f5f75.tar.bz2
spark-94598653bc772e71709163db3fed4048aa7f5f75.zip
[SPARK-6428][Streaming] Added explicit types for all public methods.
Author: Reynold Xin <rxin@databricks.com> Closes #5110 from rxin/streaming-explicit-type and squashes the following commits: 2c2db32 [Reynold Xin] [SPARK-6428][Streaming] Added explicit types for all public methods.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Duration.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Interval.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala33
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala14
38 files changed, 127 insertions, 114 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index db64e11e16..f73b463d07 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -67,12 +67,12 @@ object Checkpoint extends Logging {
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
/** Get the checkpoint file for the given checkpoint time */
- def checkpointFile(checkpointDir: String, checkpointTime: Time) = {
+ def checkpointFile(checkpointDir: String, checkpointTime: Time): Path = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds)
}
/** Get the checkpoint backup file for the given checkpoint time */
- def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = {
+ def checkpointBackupFile(checkpointDir: String, checkpointTime: Time): Path = {
new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
}
@@ -232,6 +232,8 @@ object CheckpointReader extends Logging {
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
{
val checkpointPath = new Path(checkpointDir)
+
+ // TODO(rxin): Why is this a def?!
def fs = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0e285d6088..175140481e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -100,11 +100,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- def getInputStreams() = this.synchronized { inputStreams.toArray }
+ def getInputStreams(): Array[InputDStream[_]] = this.synchronized { inputStreams.toArray }
- def getOutputStreams() = this.synchronized { outputStreams.toArray }
+ def getOutputStreams(): Array[DStream[_]] = this.synchronized { outputStreams.toArray }
- def getReceiverInputStreams() = this.synchronized {
+ def getReceiverInputStreams(): Array[ReceiverInputDStream[_]] = this.synchronized {
inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
.map(_.asInstanceOf[ReceiverInputDStream[_]])
.toArray
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
index a0d8fb5ab9..3249bb3489 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
@@ -55,7 +55,6 @@ case class Duration (private val millis: Long) {
def div(that: Duration): Double = this / that
-
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
@@ -71,7 +70,7 @@ case class Duration (private val millis: Long) {
def milliseconds: Long = millis
- def prettyPrint = Utils.msDurationToString(millis)
+ def prettyPrint: String = Utils.msDurationToString(millis)
}
@@ -80,7 +79,7 @@ case class Duration (private val millis: Long) {
* a given number of milliseconds.
*/
object Milliseconds {
- def apply(milliseconds: Long) = new Duration(milliseconds)
+ def apply(milliseconds: Long): Duration = new Duration(milliseconds)
}
/**
@@ -88,7 +87,7 @@ object Milliseconds {
* a given number of seconds.
*/
object Seconds {
- def apply(seconds: Long) = new Duration(seconds * 1000)
+ def apply(seconds: Long): Duration = new Duration(seconds * 1000)
}
/**
@@ -96,7 +95,7 @@ object Seconds {
* a given number of minutes.
*/
object Minutes {
- def apply(minutes: Long) = new Duration(minutes * 60000)
+ def apply(minutes: Long): Duration = new Duration(minutes * 60000)
}
// Java-friendlier versions of the objects above.
@@ -107,16 +106,16 @@ object Durations {
/**
* @return [[org.apache.spark.streaming.Duration]] representing given number of milliseconds.
*/
- def milliseconds(milliseconds: Long) = Milliseconds(milliseconds)
+ def milliseconds(milliseconds: Long): Duration = Milliseconds(milliseconds)
/**
* @return [[org.apache.spark.streaming.Duration]] representing given number of seconds.
*/
- def seconds(seconds: Long) = Seconds(seconds)
+ def seconds(seconds: Long): Duration = Seconds(seconds)
/**
* @return [[org.apache.spark.streaming.Duration]] representing given number of minutes.
*/
- def minutes(minutes: Long) = Minutes(minutes)
+ def minutes(minutes: Long): Duration = Minutes(minutes)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index ad4f3fdd14..3f5be785e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -39,18 +39,18 @@ class Interval(val beginTime: Time, val endTime: Time) {
this.endTime < that.endTime
}
- def <= (that: Interval) = (this < that || this == that)
+ def <= (that: Interval): Boolean = (this < that || this == that)
- def > (that: Interval) = !(this <= that)
+ def > (that: Interval): Boolean = !(this <= that)
- def >= (that: Interval) = !(this < that)
+ def >= (that: Interval): Boolean = !(this < that)
- override def toString = "[" + beginTime + ", " + endTime + "]"
+ override def toString: String = "[" + beginTime + ", " + endTime + "]"
}
private[streaming]
object Interval {
- def currentInterval(duration: Duration): Interval = {
+ def currentInterval(duration: Duration): Interval = {
val time = new Time(System.currentTimeMillis)
val intervalBegin = time.floor(duration)
new Interval(intervalBegin, intervalBegin + duration)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 543224d4b0..f57f295874 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -188,7 +188,7 @@ class StreamingContext private[streaming] (
/**
* Return the associated Spark context
*/
- def sparkContext = sc
+ def sparkContext: SparkContext = sc
/**
* Set each DStreams in this context to remember RDDs it generated in the last given duration.
@@ -596,7 +596,8 @@ object StreamingContext extends Logging {
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
- (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
+ : PairDStreamFunctions[K, V] = {
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 2eabdd9387..73030e15c5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -415,8 +415,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
implicit val cmv2: ClassTag[V2] = fakeClassTag
implicit val cmw: ClassTag[W] = fakeClassTag
- def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] = {
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ }
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 7053f47ec6..4c28654ef6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -176,11 +176,11 @@ private[python] abstract class PythonDStream(
val func = new TransformFunction(pfunc)
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
- val asJavaDStream = JavaDStream.fromDStream(this)
+ val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
}
/**
@@ -212,7 +212,7 @@ private[python] class PythonTransformed2DStream(
val func = new TransformFunction(pfunc)
- override def dependencies = List(parent, parent2)
+ override def dependencies: List[DStream[_]] = List(parent, parent2)
override def slideDuration: Duration = parent.slideDuration
@@ -223,7 +223,7 @@ private[python] class PythonTransformed2DStream(
func(Some(rdd1), Some(rdd2), validTime)
}
- val asJavaDStream = JavaDStream.fromDStream(this)
+ val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
}
/**
@@ -260,12 +260,15 @@ private[python] class PythonReducedWindowedDStream(
extends PythonDStream(parent, preduceFunc) {
super.persist(StorageLevel.MEMORY_ONLY)
- override val mustCheckpoint = true
- val invReduceFunc = new TransformFunction(pinvReduceFunc)
+ override val mustCheckpoint: Boolean = true
+
+ val invReduceFunc: TransformFunction = new TransformFunction(pinvReduceFunc)
def windowDuration: Duration = _windowDuration
+
override def slideDuration: Duration = _slideDuration
+
override def parentRememberDuration: Duration = rememberDuration + windowDuration
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index b874f561c1..795c5aa6d5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -104,7 +104,7 @@ abstract class DStream[T: ClassTag] (
private[streaming] def parentRememberDuration = rememberDuration
/** Return the StreamingContext associated with this DStream */
- def context = ssc
+ def context: StreamingContext = ssc
/* Set the creation call site */
private[streaming] val creationSite = DStream.getCreationSite()
@@ -619,14 +619,16 @@ abstract class DStream[T: ClassTag] (
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int) {
- def foreachFunc = (rdd: RDD[T], time: Time) => {
- val firstNum = rdd.take(num + 1)
- println ("-------------------------------------------")
- println ("Time: " + time)
- println ("-------------------------------------------")
- firstNum.take(num).foreach(println)
- if (firstNum.size > num) println("...")
- println()
+ def foreachFunc: (RDD[T], Time) => Unit = {
+ (rdd: RDD[T], time: Time) => {
+ val firstNum = rdd.take(num + 1)
+ println("-------------------------------------------")
+ println("Time: " + time)
+ println("-------------------------------------------")
+ firstNum.take(num).foreach(println)
+ if (firstNum.size > num) println("...")
+ println()
+ }
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 0dc72790fb..39fd213428 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
}
- override def toString() = {
+ override def toString: String = {
"[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
currentCheckpointFiles.mkString("\n") + "\n]"
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 22de8c02e6..66d519171f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -298,7 +298,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
private[streaming]
class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
+ private def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
override def update(time: Time) {
hadoopFiles.clear()
@@ -320,7 +320,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
}
}
- override def toString() = {
+ override def toString: String = {
"[\n" + hadoopFiles.size + " file sets\n" +
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index c81534ae58..fcd5216f10 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -27,7 +27,7 @@ class FilteredDStream[T: ClassTag](
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index 6586234554..9d09a3baf3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -28,7 +28,7 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index c7bb2833ea..475ea2d2d4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -27,7 +27,7 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 1361c30395..685a32e1d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -28,7 +28,7 @@ class ForEachDStream[T: ClassTag] (
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index a9bb51f054..dbb295fe54 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -25,7 +25,7 @@ private[streaming]
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index aa1993f058..e652702e21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -61,7 +61,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
}
}
- override def dependencies = List()
+ override def dependencies: List[DStream[_]] = List()
override def slideDuration: Duration = {
if (ssc == null) throw new Exception("ssc is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 3d8ee29df1..5994bc1e23 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -28,7 +28,7 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
preservePartitioning: Boolean
) extends DStream[U](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 7aea1f945d..954d2eb4a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -28,7 +28,7 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index 02704a8d1c..fa14b2e897 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -27,7 +27,7 @@ class MappedDStream[T: ClassTag, U: ClassTag] (
mapFunc: T => U
) extends DStream[U](parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index c0a5af0b65..1385ccbf56 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -52,7 +52,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
// Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
- val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
+ private val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
// Persist RDDs to memory by default as these RDDs are going to be reused.
super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -60,7 +60,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
def windowDuration: Duration = _windowDuration
- override def dependencies = List(reducedStream)
+ override def dependencies: List[DStream[_]] = List(reducedStream)
override def slideDuration: Duration = _slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 880a89bc36..7757ccac09 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -33,7 +33,7 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
mapSideCombine: Boolean = true
) extends DStream[(K,C)] (parent.ssc) {
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebb04dd35b..de8718d0a8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -36,7 +36,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
super.persist(StorageLevel.MEMORY_ONLY_SER)
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 71b61856e2..5d46ca0715 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -32,7 +32,7 @@ class TransformedDStream[U: ClassTag] (
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
- override def dependencies = parents.toList
+ override def dependencies: List[DStream[_]] = parents.toList
override def slideDuration: Duration = parents.head.slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index abbc40befa..9405dbaa12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -33,17 +33,17 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
require(parents.map(_.slideDuration).distinct.size == 1,
"Some of the DStreams have different slide durations")
- override def dependencies = parents.toList
+ override def dependencies: List[DStream[_]] = parents.toList
override def slideDuration: Duration = parents.head.slideDuration
override def compute(validTime: Time): Option[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
- parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ parents.map(_.getOrCompute(validTime)).foreach {
case Some(rdd) => rdds += rdd
case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
+ validTime)
- })
+ }
if (rdds.size > 0) {
Some(new UnionRDD(ssc.sc, rdds))
} else {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 775b6bfd06..899865a906 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -46,7 +46,7 @@ class WindowedDStream[T: ClassTag](
def windowDuration: Duration = _windowDuration
- override def dependencies = List(parent)
+ override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = _slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index dd1e963349..93caa4ba35 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -117,8 +117,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
- def segmentLocations = HdfsUtils.getFileSegmentLocations(
- partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
- blockLocations.getOrElse(segmentLocations)
+ blockLocations.getOrElse(
+ HdfsUtils.getFileSegmentLocations(
+ partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index a7d63bd4f2..cd309788a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.receiver
+import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
@@ -25,10 +26,10 @@ import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.storage.StorageLevel
-import java.nio.ByteBuffer
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -149,13 +150,13 @@ private[streaming] class ActorReceiver[T: ClassTag](
class Supervisor extends Actor {
override val supervisorStrategy = receiverSupervisorStrategy
- val worker = context.actorOf(props, name)
+ private val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path)
- val n: AtomicInteger = new AtomicInteger(0)
- val hiccups: AtomicInteger = new AtomicInteger(0)
+ private val n: AtomicInteger = new AtomicInteger(0)
+ private val hiccups: AtomicInteger = new AtomicInteger(0)
- def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case IteratorData(iterator) =>
logDebug("received iterator")
@@ -189,13 +190,12 @@ private[streaming] class ActorReceiver[T: ClassTag](
}
}
- def onStart() = {
+ def onStart(): Unit = {
supervisor
logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
-
}
- def onStop() = {
+ def onStop(): Unit = {
supervisor ! PoisonPill
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index ee5e639b26..42514d8b47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -120,7 +120,7 @@ private[streaming] class BlockGenerator(
* `BlockGeneratorListener.onAddData` callback will be called. All received data items
* will be periodically pushed into BlockManager.
*/
- def addDataWithCallback(data: Any, metadata: Any) = synchronized {
+ def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
waitToPush()
currentBuffer += data
listener.onAddData(data, metadata)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 5acf8a9a81..5b5a3fe648 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -245,7 +245,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* Get the unique identifier the receiver input stream that this
* receiver is associated with.
*/
- def streamId = id
+ def streamId: Int = id
/*
* =================
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 1f0244c251..4943f29395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -162,13 +162,13 @@ private[streaming] abstract class ReceiverSupervisor(
}
/** Check if receiver has been marked for stopping */
- def isReceiverStarted() = {
+ def isReceiverStarted(): Boolean = {
logDebug("state = " + receiverState)
receiverState == Started
}
/** Check if receiver has been marked for stopping */
- def isReceiverStopped() = {
+ def isReceiverStopped(): Boolean = {
logDebug("state = " + receiverState)
receiverState == Stopped
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 7d29ed88cf..8f2f1fef76 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Await
-import akka.actor.{Actor, Props}
+import akka.actor.{ActorRef, Actor, Props}
import akka.pattern.ask
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
@@ -83,7 +83,7 @@ private[streaming] class ReceiverSupervisorImpl(
private val actor = env.actorSystem.actorOf(
Props(new Actor {
- override def receive() = {
+ override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
stop("Stopped by driver", None)
@@ -92,7 +92,7 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
}
- def ref = self
+ def ref: ActorRef = self
}), "Receiver-" + streamId + "-" + System.currentTimeMillis())
/** Unique block ids if one wants to add blocks directly */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7e0f6b2cdf..30cf87f5b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -36,5 +36,5 @@ class Job(val time: Time, func: () => _) {
id = "streaming job " + time + "." + number
}
- override def toString = id
+ override def toString: String = id
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 59488dfb0f..4946806d2e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -82,7 +82,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
if (eventActor != null) return // generator has already been started
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
@@ -111,8 +111,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val pollTime = 100
// To prevent graceful stop to get stuck permanently
- def hasTimedOut = {
- val timedOut = System.currentTimeMillis() - timeWhenStopStarted > stopTimeout
+ def hasTimedOut: Boolean = {
+ val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeout
if (timedOut) {
logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
}
@@ -133,7 +133,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Stopped generation timer")
// Wait for the jobs to complete and checkpoints to be written
- def haveAllBatchesBeenProcessed = {
+ def haveAllBatchesBeenProcessed: Boolean = {
lastProcessedBatch != null && lastProcessedBatch.milliseconds == stopTime
}
logInfo("Waiting for jobs to be processed and checkpoints to be written")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 60bc099b27..d6a93acbe7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -56,7 +56,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 8c15a75b1b..5b134877d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -28,8 +28,7 @@ private[streaming]
case class JobSet(
time: Time,
jobs: Seq[Job],
- receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
- ) {
+ receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty) {
private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
@@ -48,17 +47,17 @@ case class JobSet(
if (hasCompleted) processingEndTime = System.currentTimeMillis()
}
- def hasStarted = processingStartTime > 0
+ def hasStarted: Boolean = processingStartTime > 0
- def hasCompleted = incompleteJobs.isEmpty
+ def hasCompleted: Boolean = incompleteJobs.isEmpty
// Time taken to process all the jobs from the time they started processing
// (i.e. not including the time they wait in the streaming scheduler queue)
- def processingDelay = processingEndTime - processingStartTime
+ def processingDelay: Long = processingEndTime - processingStartTime
// Time taken to process all the jobs from the time they were submitted
// (i.e. including the time they wait in the streaming scheduler queue)
- def totalDelay = {
+ def totalDelay: Long = {
processingEndTime - time.milliseconds
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b36aeb341d..9890047313 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -72,7 +72,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private var actor: ActorRef = null
/** Start the actor and receiver execution thread. */
- def start() = synchronized {
+ def start(): Unit = synchronized {
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}
@@ -86,7 +86,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
/** Stop the receiver execution thread. */
- def stop(graceful: Boolean) = synchronized {
+ def stop(graceful: Boolean): Unit = synchronized {
if (!receiverInputStreams.isEmpty && actor != null) {
// First, stop the receivers
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
@@ -201,7 +201,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Actor to receive messages from the receivers. */
private class ReceiverTrackerActor extends Actor {
- def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
@@ -244,16 +244,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
if (graceful) {
val pollTime = 100
- def done = { receiverInfo.isEmpty && !running }
logInfo("Waiting for receiver job to terminate gracefully")
- while(!done) {
+ while (receiverInfo.nonEmpty || running) {
Thread.sleep(pollTime)
}
logInfo("Waited for receiver job to terminate gracefully")
}
// Check if all the receivers have been deregistered or not
- if (!receiverInfo.isEmpty) {
+ if (receiverInfo.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receiverInfo)
} else {
logInfo("All of the receivers have deregistered successfully")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 5ee53a5c5f..e4bd067cac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,9 +17,10 @@
package org.apache.spark.streaming.ui
+import scala.collection.mutable.{Queue, HashMap}
+
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
-import scala.collection.mutable.{Queue, HashMap}
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
@@ -59,11 +60,13 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
- runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+ synchronized {
+ runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ }
}
- override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
@@ -72,19 +75,21 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
- waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- completedaBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
- totalCompletedBatches += 1L
-
- batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
- totalProcessedRecords += infos.map(_.numRecords).sum
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+ synchronized {
+ waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+ completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ totalCompletedBatches += 1L
+
+ batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+ totalProcessedRecords += infos.map(_.numRecords).sum
+ }
}
}
- def numReceivers = synchronized {
+ def numReceivers: Int = synchronized {
ssc.graph.getReceiverInputStreams().size
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index a73d6f3bf0..4d968f8bfa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -18,9 +18,7 @@
package org.apache.spark.streaming.util
import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
import org.apache.spark.util.collection.OpenHashMap
-import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
@@ -71,7 +69,7 @@ object RawTextHelper {
var count = 0
while(data.hasNext) {
- value = data.next
+ value = data.next()
if (value != null) {
count += 1
if (len == 0) {
@@ -108,9 +106,13 @@ object RawTextHelper {
}
}
- def add(v1: Long, v2: Long) = (v1 + v2)
+ def add(v1: Long, v2: Long): Long = {
+ v1 + v2
+ }
- def subtract(v1: Long, v2: Long) = (v1 - v2)
+ def subtract(v1: Long, v2: Long): Long = {
+ v1 - v2
+ }
- def max(v1: Long, v2: Long) = math.max(v1, v2)
+ def max(v1: Long, v2: Long): Long = math.max(v1, v2)
}