diff options
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 30 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala | 11 |
2 files changed, 5 insertions, 36 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index e20e2c8f26..28ac5929df 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -26,8 +26,6 @@ import java.util.concurrent.Executors import kafka.consumer._ import kafka.serializer.Decoder import kafka.utils.VerifiableProperties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient._ import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel @@ -97,12 +95,6 @@ class KafkaReceiver[ consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + zkConnect) - // When auto.offset.reset is defined, it is our responsibility to try and whack the - // consumer group zk node. - if (kafkaParams.contains("auto.offset.reset")) { - tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id")) - } - val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] @@ -139,26 +131,4 @@ class KafkaReceiver[ } } } - - // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This - // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. - // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied - // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to - // 'smallest'/'largest': - // scalastyle:off - // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala - // scalastyle:on - private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary Zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - try { - zk.deleteRecursive(dir) - } catch { - case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e) - } finally { - zk.close() - } - } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 48668f763e..ec812e1ef3 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -17,19 +17,18 @@ package org.apache.spark.streaming.kafka -import scala.reflect.ClassTag -import scala.collection.JavaConversions._ - import java.lang.{Integer => JInt} import java.util.{Map => JMap} +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + import kafka.serializer.{Decoder, StringDecoder} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} - +import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream object KafkaUtils { /** |