aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
committerReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
commit82701ee25fda64f03899713bc56f82ca6f278151 (patch)
tree07fba36d66228f7561bd65dd502fd668d50a9be5 /streaming
parentc42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 (diff)
downloadspark-82701ee25fda64f03899713bc56f82ca6f278151.tar.gz
spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.bz2
spark-82701ee25fda64f03899713bc56f82ca6f278151.zip
[SPARK-6428] Turn on explicit type checking for public methods.
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle. Author: Reynold Xin <rxin@databricks.com> Closes #5342 from rxin/SPARK-6428 and squashes the following commits: 7b531ab [Reynold Xin] import ordering 2d9a8a5 [Reynold Xin] jl e668b1c [Reynold Xin] override 9b9e119 [Reynold Xin] Parenthesis. 82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
5 files changed, 16 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f73b463d07..28703ef812 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -234,7 +234,7 @@ object CheckpointReader extends Logging {
val checkpointPath = new Path(checkpointDir)
// TODO(rxin): Why is this a def?!
- def fs = checkpointPath.getFileSystem(hadoopConf)
+ def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 73030e15c5..808dcc174c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -169,7 +169,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -179,7 +179,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
- def fn = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -190,7 +190,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[U] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -201,7 +203,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
- def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
+ (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator())
+ }
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index f94f2d0e8b..93baad19e3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -526,7 +526,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
- def fn = (x: V) => f.apply(x).asScala
+ def fn: (V) => Iterable[U] = (x: V) => f.apply(x).asScala
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index e3db01c1e1..4095a7cc84 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -192,7 +192,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaReceiverInputDStream[T] = {
- def fn = (x: InputStream) => converter.call(x).toIterator
+ def fn: (InputStream) => Iterator[T] = (x: InputStream) => converter.call(x).toIterator
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
@@ -313,7 +313,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
- def fn = (x: Path) => filter.call(x).booleanValue()
+ def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}
@@ -344,7 +344,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
- def fn = (x: Path) => filter.call(x).booleanValue()
+ def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}
@@ -625,7 +625,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
- def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
+ def stop(stopSparkContext: Boolean): Unit = ssc.stop(stopSparkContext)
/**
* Stop the execution of the streams.
@@ -633,7 +633,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* @param stopGracefully Stop gracefully by waiting for the processing of all
* received data to be completed
*/
- def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
+ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
ssc.stop(stopSparkContext, stopGracefully)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 795c5aa6d5..24f99a2b92 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -839,7 +839,7 @@ object DStream {
/** Filtering function that excludes non-user classes for a streaming application */
def streamingExclustionFunction(className: String): Boolean = {
- def doesMatch(r: Regex) = r.findFirstIn(className).isDefined
+ def doesMatch(r: Regex): Boolean = r.findFirstIn(className).isDefined
val isSparkClass = doesMatch(SPARK_CLASS_REGEX)
val isSparkExampleClass = doesMatch(SPARK_EXAMPLES_CLASS_REGEX)
val isSparkStreamingTestClass = doesMatch(SPARK_STREAMING_TESTCLASS_REGEX)