diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-01-14 17:13:10 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-01-14 17:13:10 -0700 |
commit | b61a4ec77300d6e7fb40f771a9054ae8bc4488de (patch) | |
tree | 12a59533e1316dcd50c9e970b588ce9af92a52dc | |
parent | 82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff) | |
download | spark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.tar.gz spark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.tar.bz2 spark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.zip |
Removing offset management code that is non-existent in kafka 0.7.0+
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 7 |
1 files changed, 0 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 2b4740bdf7..9605072382 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -173,13 +173,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String, stream.takeWhile { msgAndMetadata => blockGenerator += msgAndMetadata.message - // Updating the offet. The key is (broker, topic, group, partition). - val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, - groupId, msgAndMetadata.topicInfo.partition.partId) - val offset = msgAndMetadata.topicInfo.getConsumeOffset - offsets.put(key, offset) - // logInfo("Handled message: " + (key, offset).toString) - // Keep on handling messages true } |