aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala30
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala11
2 files changed, 5 insertions, 36 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index e20e2c8f26..28ac5929df 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -26,8 +26,6 @@ import java.util.concurrent.Executors
import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient._
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
@@ -97,12 +95,6 @@ class KafkaReceiver[
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)
- // When auto.offset.reset is defined, it is our responsibility to try and whack the
- // consumer group zk node.
- if (kafkaParams.contains("auto.offset.reset")) {
- tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
- }
-
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
@@ -139,26 +131,4 @@ class KafkaReceiver[
}
}
}
-
- // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
- // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
- //
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
- // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
- // 'smallest'/'largest':
- // scalastyle:off
- // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
- // scalastyle:on
- private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
- val dir = "/consumers/" + groupId
- logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
- try {
- zk.deleteRecursive(dir)
- } catch {
- case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
- } finally {
- zk.close()
- }
- }
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 48668f763e..ec812e1ef3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -17,19 +17,18 @@
package org.apache.spark.streaming.kafka
-import scala.reflect.ClassTag
-import scala.collection.JavaConversions._
-
import java.lang.{Integer => JInt}
import java.util.{Map => JMap}
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
-
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
object KafkaUtils {
/**