aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala5
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala4
2 files changed, 5 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 04e65cb3d7..1b1fc8051d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -129,8 +129,9 @@ class DirectKafkaInputDStream[
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def batchForTime = data.asInstanceOf[mutable.HashMap[
- Time, Array[OffsetRange.OffsetRangeTuple]]]
+ def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
+ data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
+ }
override def update(time: Time) {
batchForTime.clear()
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 6d465bcb6b..4a83b715fa 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -155,7 +155,7 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close() = consumer.close()
+ override def close(): Unit = consumer.close()
override def getNext(): R = {
if (iter == null || !iter.hasNext) {
@@ -207,7 +207,7 @@ object KafkaRDD {
fromOffsets: Map[TopicAndPartition, Long],
untilOffsets: Map[TopicAndPartition, LeaderOffset],
messageHandler: MessageAndMetadata[K, V] => R
- ): KafkaRDD[K, V, U, T, R] = {
+ ): KafkaRDD[K, V, U, T, R] = {
val leaders = untilOffsets.map { case (tp, lo) =>
tp -> (lo.host, lo.port)
}.toMap