diff options
author | Reynold Xin <rxin@databricks.com> | 2015-04-03 01:25:02 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-03 01:25:02 -0700 |
commit | 82701ee25fda64f03899713bc56f82ca6f278151 (patch) | |
tree | 07fba36d66228f7561bd65dd502fd668d50a9be5 /external/kafka | |
parent | c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 (diff) | |
download | spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.gz spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.bz2 spark-82701ee25fda64f03899713bc56f82ca6f278151.zip |
[SPARK-6428] Turn on explicit type checking for public methods.
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle.
Author: Reynold Xin <rxin@databricks.com>
Closes #5342 from rxin/SPARK-6428 and squashes the following commits:
7b531ab [Reynold Xin] import ordering
2d9a8a5 [Reynold Xin] jl
e668b1c [Reynold Xin] override
9b9e119 [Reynold Xin] Parenthesis.
82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
Diffstat (limited to 'external/kafka')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 5 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 4 |
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 04e65cb3d7..1b1fc8051d 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -129,8 +129,9 @@ class DirectKafkaInputDStream[ private[streaming] class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def batchForTime = data.asInstanceOf[mutable.HashMap[ - Time, Array[OffsetRange.OffsetRangeTuple]]] + def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { + data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] + } override def update(time: Time) { batchForTime.clear() diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 6d465bcb6b..4a83b715fa 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -155,7 +155,7 @@ class KafkaRDD[ .dropWhile(_.offset < requestOffset) } - override def close() = consumer.close() + override def close(): Unit = consumer.close() override def getNext(): R = { if (iter == null || !iter.hasNext) { @@ -207,7 +207,7 @@ object KafkaRDD { fromOffsets: Map[TopicAndPartition, Long], untilOffsets: Map[TopicAndPartition, LeaderOffset], messageHandler: MessageAndMetadata[K, V] => R - ): KafkaRDD[K, V, U, T, R] = { + ): KafkaRDD[K, V, U, T, R] = { val leaders = untilOffsets.map { case (tp, lo) => tp -> (lo.host, lo.port) }.toMap |