aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-26 23:56:15 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-03-26 23:56:15 -0600
commit329ef34c2e04d28c2ad150cf6674d6e86d7511ce (patch)
tree29b935e38ddce61b05eaf316552787127c698fa6 /streaming/src/main
parentd61978d0abad30a148680c8a63df33e40e469525 (diff)
downloadspark-329ef34c2e04d28c2ad150cf6674d6e86d7511ce.tar.gz
spark-329ef34c2e04d28c2ad150cf6674d6e86d7511ce.tar.bz2
spark-329ef34c2e04d28c2ad150cf6674d6e86d7511ce.zip
fixing autooffset.reset behavior when set to 'largest'
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala10
1 files changed, 6 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 85693808d1..17a5be3420 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -75,9 +75,9 @@ class KafkaReceiver(kafkaParams: Map[String, String],
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + kafkaParams("zk.connect"))
- // When autooffset.reset is 'smallest', it is our responsibility to try and whack the
+ // When autooffset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
- if (kafkaParams.get("autooffset.reset").exists(_ == "smallest")) {
+ if (kafkaParams.contains("autooffset.reset")) {
tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
}
@@ -100,9 +100,11 @@ class KafkaReceiver(kafkaParams: Map[String, String],
}
}
- // Delete consumer group from zookeeper. This effectivly resets the group so we can consume from the beginning again.
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+ // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ //
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest':
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try {