aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-11 11:11:49 -0800
committerDenny <dennybritz@gmail.com>2012-11-11 11:11:49 -0800
commitdeb2c4df72f65f2bd90cc97a9abcd59a12eecabc (patch)
tree969ed76348584079a73261ec2f3df0d793d4bc7e /streaming/src
parentd006109e9504b3221de3a15f9bfee96dafa8b593 (diff)
downloadspark-deb2c4df72f65f2bd90cc97a9abcd59a12eecabc.tar.gz
spark-deb2c4df72f65f2bd90cc97a9abcd59a12eecabc.tar.bz2
spark-deb2c4df72f65f2bd90cc97a9abcd59a12eecabc.zip
Add comment.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index ad8e86a094..cc74855983 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -66,6 +66,8 @@ class KafkaInputDStream[T: ClassManifest](
val latestOffsets = savedOffsets(key)
logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
+ // TODO: This may throw out offsets that are created after the checkpoint,
+ // but it's unlikely we'll need them.
savedOffsets.clear()
}
}