aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-01-14 17:13:10 -0700
committerseanm <sean.mcnamara@webtrends.com>2013-01-14 17:13:10 -0700
commitb61a4ec77300d6e7fb40f771a9054ae8bc4488de (patch)
tree12a59533e1316dcd50c9e970b588ce9af92a52dc /streaming/src
parent82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff)
downloadspark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.tar.gz
spark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.tar.bz2
spark-b61a4ec77300d6e7fb40f771a9054ae8bc4488de.zip
Removing offset management code that is non-existent in kafka 0.7.0+
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala7
1 files changed, 0 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 2b4740bdf7..9605072382 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -173,13 +173,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
stream.takeWhile { msgAndMetadata =>
blockGenerator += msgAndMetadata.message
- // Updating the offet. The key is (broker, topic, group, partition).
- val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
- groupId, msgAndMetadata.topicInfo.partition.partId)
- val offset = msgAndMetadata.topicInfo.getConsumeOffset
- offsets.put(key, offset)
- // logInfo("Handled message: " + (key, offset).toString)
-
// Keep on handling messages
true
}