aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 1a9d78c0d4..ea5f842c6c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -197,7 +197,11 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close(): Unit = consumer.close()
+ override def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
override def getNext(): R = {
if (iter == null || !iter.hasNext) {