aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorNirmalReddy <nirmal_reddy2000@yahoo.com>2014-03-26 18:24:55 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-26 18:24:55 -0700
commit3e63d98f09065386901d78c141b0da93cdce0f76 (patch)
tree00e49741d5f8bbb5c830d371fde2d98708dcab57 /streaming
parentbe6d96c15b3c31cd27bdd79fb259072479151ae6 (diff)
downloadspark-3e63d98f09065386901d78c141b0da93cdce0f76.tar.gz
spark-3e63d98f09065386901d78c141b0da93cdce0f76.tar.bz2
spark-3e63d98f09065386901d78c141b0da93cdce0f76.zip
Spark 1095 : Adding explicit return types to all public methods
Excluded those that are self-evident and the cases that are discussed in the mailing list. Author: NirmalReddy <nirmal_reddy2000@yahoo.com> Author: NirmalReddy <nirmal.reddy@imaginea.com> Closes #168 from NirmalReddy/Spark-1095 and squashes the following commits: ac54b29 [NirmalReddy] import misplaced 8c5ff3e [NirmalReddy] Changed syntax of unit returning methods 02d0778 [NirmalReddy] fixed explicit types in all the other packages 1c17773 [NirmalReddy] fixed explicit types in core package
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala8
5 files changed, 33 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 062b888e80..e198c69470 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -431,7 +431,7 @@ class StreamingContext private[streaming] (
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
- def stop(stopSparkContext: Boolean = true) = synchronized {
+ def stop(stopSparkContext: Boolean = true): Unit = synchronized {
scheduler.stop()
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
@@ -489,7 +489,7 @@ object StreamingContext extends Logging {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
- def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
+ def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a85cd04c93..bb2f492d06 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -49,7 +49,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
- def print() = dstream.print()
+ def print(): Unit = {
+ dstream.print()
+ }
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
@@ -401,7 +403,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Enable periodic checkpointing of RDDs of this DStream.
* @param interval Time interval after which generated RDD will be checkpointed
*/
- def checkpoint(interval: Duration) = {
+ def checkpoint(interval: Duration): DStream[T] = {
dstream.checkpoint(interval)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index c48d754e43..b705d2ec9a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -477,31 +477,41 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Start the execution of the streams.
*/
- def start() = ssc.start()
+ def start(): Unit = {
+ ssc.start()
+ }
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*/
- def awaitTermination() = ssc.awaitTermination()
+ def awaitTermination(): Unit = {
+ ssc.awaitTermination()
+ }
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
- def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout)
+ def awaitTermination(timeout: Long): Unit = {
+ ssc.awaitTermination(timeout)
+ }
/**
* Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
- def stop() = ssc.stop()
+ def stop(): Unit = {
+ ssc.stop()
+ }
/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
- def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
+ def stop(stopSparkContext: Boolean): Unit = {
+ ssc.stop(stopSparkContext)
+ }
}
/**
@@ -579,7 +589,7 @@ object JavaStreamingContext {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
- def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
+ def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 6bff56a9d3..d48b51aa69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -503,14 +503,18 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
+ def foreach(foreachFunc: RDD[T] => Unit): Unit = {
+ this.foreachRDD(foreachFunc)
+ }
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
- def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
+ def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
+ this.foreachRDD(foreachFunc)
+ }
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 4e8d07fe92..7f3cd2f8eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -39,17 +39,19 @@ case class BatchInfo(
* was submitted to the streaming scheduler. Essentially, it is
* `processingStartTime` - `submissionTime`.
*/
- def schedulingDelay = processingStartTime.map(_ - submissionTime)
+ def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
/**
* Time taken for the all jobs of this batch to finish processing from the time they started
* processing. Essentially, it is `processingEndTime` - `processingStartTime`.
*/
- def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+ def processingDelay: Option[Long] = processingEndTime.zip(processingStartTime)
+ .map(x => x._1 - x._2).headOption
/**
* Time taken for all the jobs of this batch to finish processing from the time they
* were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
*/
- def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
+ def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
+ .map(x => x._1 + x._2).headOption
}