diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala | 2 |
1 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala index ad8e86a094..cc74855983 100644 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -66,6 +66,8 @@ class KafkaInputDStream[T: ClassManifest]( val latestOffsets = savedOffsets(key) logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) + // TODO: This may throw out offsets that are created after the checkpoint, + // but it's unlikely we'll need them. savedOffsets.clear() } } |