aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index ad8e86a094..cc74855983 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -66,6 +66,8 @@ class KafkaInputDStream[T: ClassManifest](
val latestOffsets = savedOffsets(key)
logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
+ // TODO: This may throw out offsets that are created after the checkpoint,
+ // but it's unlikely we'll need them.
savedOffsets.clear()
}
}