aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
committerReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
commit82701ee25fda64f03899713bc56f82ca6f278151 (patch)
tree07fba36d66228f7561bd65dd502fd668d50a9be5 /external
parentc42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 (diff)
downloadspark-82701ee25fda64f03899713bc56f82ca6f278151.tar.gz
spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.bz2
spark-82701ee25fda64f03899713bc56f82ca6f278151.zip
[SPARK-6428] Turn on explicit type checking for public methods.
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle. Author: Reynold Xin <rxin@databricks.com> Closes #5342 from rxin/SPARK-6428 and squashes the following commits: 7b531ab [Reynold Xin] import ordering 2d9a8a5 [Reynold Xin] jl e668b1c [Reynold Xin] override 9b9e119 [Reynold Xin] Parenthesis. 82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala12
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala13
5 files changed, 19 insertions, 17 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2de2a7926b..60e2994431 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -37,8 +37,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
-import org.jboss.netty.channel.ChannelPipelineFactory
-import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.compression._
@@ -187,8 +186,8 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}
- override def preferredLocation = Some(host)
-
+ override def preferredLocation: Option[String] = Option(host)
+
/** A Netty Pipeline factory that will decompress incoming data from
* and the Netty client and compress data going back to the client.
*
@@ -198,13 +197,12 @@ class FlumeReceiver(
*/
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-
- def getPipeline() = {
+ def getPipeline(): ChannelPipeline = {
val pipeline = Channels.pipeline()
val encoder = new ZlibEncoder(6)
pipeline.addFirst("deflater", encoder)
pipeline.addFirst("inflater", new ZlibDecoder())
pipeline
+ }
}
}
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 04e65cb3d7..1b1fc8051d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -129,8 +129,9 @@ class DirectKafkaInputDStream[
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def batchForTime = data.asInstanceOf[mutable.HashMap[
- Time, Array[OffsetRange.OffsetRangeTuple]]]
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
override def update(time: Time) {
batchForTime.clear()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 6d465bcb6b..4a83b715fa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -155,7 +155,7 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close() = consumer.close()
+ override def close(): Unit = consumer.close()
override def getNext(): R = {
if (iter == null || !iter.hasNext) {
@@ -207,7 +207,7 @@ object KafkaRDD {
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
- ): KafkaRDD[K, V, U, T, R] = {
+ ): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 4eacc47da5..7cf02d85d7 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -70,7 +70,7 @@ class TwitterReceiver(
try {
val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
newTwitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
+ def onStatus(status: Status): Unit = {
store(status)
}
// Unimplemented
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 554705878e..588e6bac7b 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -29,13 +29,16 @@ import org.apache.spark.streaming.receiver.ActorHelper
/**
* A receiver to subscribe to ZeroMQ stream.
*/
-private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T])
+private[streaming] class ZeroMQReceiver[T: ClassTag](
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with ActorHelper with Logging {
- override def preStart() = ZeroMQExtension(context.system)
- .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+ override def preStart(): Unit = {
+ ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
+ }
def receive: Receive = {