aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-08-12 17:44:16 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-12 17:44:16 -0700
commit8ce60963cb0928058ef7b6e29ff94eb69d1143af (patch)
tree6d05e09f6fd646e28843250ff5472d6c0ba9ee0c /external
parent660e6dcff8125b83cc73dbe00c90cbe58744bc66 (diff)
downloadspark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.tar.gz
spark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.tar.bz2
spark-8ce60963cb0928058ef7b6e29ff94eb69d1143af.zip
[SPARK-9780] [STREAMING] [KAFKA] prevent NPE if KafkaRDD instantiation …
…fails Author: cody koeninger <cody@koeninger.org> Closes #8133 from koeninger/SPARK-9780 and squashes the following commits: 406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 1a9d78c0d4..ea5f842c6c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -197,7 +197,11 @@ class KafkaRDD[
.dropWhile(_.offset < requestOffset)
}
- override def close(): Unit = consumer.close()
+ override def close(): Unit = {
+ if (consumer != null) {
+ consumer.close()
+ }
+ }
override def getNext(): R = {
if (iter == null || !iter.hasNext) {