From deb2c4df72f65f2bd90cc97a9abcd59a12eecabc Mon Sep 17 00:00:00 2001 From: Denny Date: Sun, 11 Nov 2012 11:11:49 -0800 Subject: Add comment. --- streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala | 2 ++ 1 file changed, 2 insertions(+) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala index ad8e86a094..cc74855983 100644 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -66,6 +66,8 @@ class KafkaInputDStream[T: ClassManifest]( val latestOffsets = savedOffsets(key) logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + // TODO: This may throw out offsets that are created after the checkpoint, + // but it's unlikely we'll need them. savedOffsets.clear() } } -- cgit v1.2.3