From 8ce60963cb0928058ef7b6e29ff94eb69d1143af Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 12 Aug 2015 17:44:16 -0700 Subject: [SPARK-9780] [STREAMING] [KAFKA] prevent NPE if KafkaRDD instantiation … MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …fails Author: cody koeninger Closes #8133 from koeninger/SPARK-9780 and squashes the following commits: 406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails --- .../src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'external') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 1a9d78c0d4..ea5f842c6c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -197,7 +197,11 @@ class KafkaRDD[ .dropWhile(_.offset < requestOffset) } - override def close(): Unit = consumer.close() + override def close(): Unit = { + if (consumer != null) { + consumer.close() + } + } override def getNext(): R = { if (iter == null || !iter.hasNext) { -- cgit v1.2.3