diff options
author | cody koeninger <cody@koeninger.org> | 2015-08-12 17:44:16 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-12 17:44:16 -0700 |
commit | 8ce60963cb0928058ef7b6e29ff94eb69d1143af (patch) | |
tree | 6d05e09f6fd646e28843250ff5472d6c0ba9ee0c | |
parent | 660e6dcff8125b83cc73dbe00c90cbe58744bc66 (diff) | |
download | spark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.tar.gz spark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.tar.bz2 spark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.zip |
[SPARK-9780] [STREAMING] [KAFKA] prevent NPE if KafkaRDD instantiation …
…fails
Author: cody koeninger <cody@koeninger.org>
Closes #8133 from koeninger/SPARK-9780 and squashes the following commits:
406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 1a9d78c0d4..ea5f842c6c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -197,7 +197,11 @@ class KafkaRDD[ .dropWhile(_.offset < requestOffset) } - override def close(): Unit = consumer.close() + override def close(): Unit = { + if (consumer != null) { + consumer.close() + } + } override def getNext(): R = { if (iter == null || !iter.hasNext) { |