aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
committerReynold Xin <rxin@databricks.com>2015-04-03 01:25:02 -0700
commit82701ee25fda64f03899713bc56f82ca6f278151 (patch)
tree07fba36d66228f7561bd65dd502fd668d50a9be5 /external/kafka
parentc42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 (diff)
downloadspark-82701ee25fda64f03899713bc56f82ca6f278151.tar.gz
spark-82701ee25fda64f03899713bc56f82ca6f278151.tar.bz2
spark-82701ee25fda64f03899713bc56f82ca6f278151.zip
[SPARK-6428] Turn on explicit type checking for public methods.
This builds on my earlier pull requests and turns on the explicit type checking in scalastyle. Author: Reynold Xin <rxin@databricks.com> Closes #5342 from rxin/SPARK-6428 and squashes the following commits: 7b531ab [Reynold Xin] import ordering 2d9a8a5 [Reynold Xin] jl e668b1c [Reynold Xin] override 9b9e119 [Reynold Xin] Parenthesis. 82e0cf5 [Reynold Xin] [SPARK-6428] Turn on explicit type checking for public methods.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala4
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 04e65cb3d7..1b1fc8051d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -129,8 +129,9 @@ class DirectKafkaInputDStream[
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def batchForTime = data.asInstanceOf[mutable.HashMap[
- Time, Array[OffsetRange.OffsetRangeTuple]]]
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
override def update(time: Time) {
batchForTime.clear()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 6d465bcb6b..4a83b715fa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -155,7 +155,7 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close() = consumer.close()
+ override def close(): Unit = consumer.close()
override def getNext(): R = {
if (iter == null || !iter.hasNext) {
@@ -207,7 +207,7 @@ object KafkaRDD {
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
- ): KafkaRDD[K, V, U, T, R] = {
+ ): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap