aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-05 17:53:56 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-05 17:53:56 -0800
commitf8bb719cd212f7e7f821c3f69b897985f47a2f83 (patch)
treef333bc57eb642850ca92b600ac0968c56d1f3816 /streaming
parent395167f2b2a1906cde23b1f3ddc2808514bce47b (diff)
downloadspark-f8bb719cd212f7e7f821c3f69b897985f47a2f83.tar.gz
spark-f8bb719cd212f7e7f821c3f69b897985f47a2f83.tar.bz2
spark-f8bb719cd212f7e7f821c3f69b897985f47a2f83.zip
Added a few more comments to the checkpoint-related functions.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala8
1 files changed, 8 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 922ff5088d..40744eac19 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -288,20 +288,27 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
+ // Get the checkpointed RDDs from the generated RDDs
val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
.map(x => (x._1, x._2.getCheckpointData()))
+ // Make a copy of the existing checkpoint data
val oldCheckpointData = checkpointData.clone()
+
+ // If the new checkpoint has checkpoints then replace existing with the new one
if (newCheckpointData.size > 0) {
checkpointData.clear()
checkpointData ++= newCheckpointData
}
+ // Make dependencies update their checkpoint data
dependencies.foreach(_.updateCheckpointData(currentTime))
+ // TODO: remove this, this is just for debugging
newCheckpointData.foreach {
case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
+ // If old checkpoint files have been removed from checkpoint data, then remove the files
if (newCheckpointData.size > 0) {
(oldCheckpointData -- newCheckpointData.keySet).foreach {
case (time, data) => {
@@ -322,6 +329,7 @@ extends Serializable with Logging {
* override the updateCheckpointData() method would also need to override this method.
*/
protected[streaming] def restoreCheckpointData() {
+ // Create RDDs from the checkpoint data
logInfo("Restoring checkpoint data from " + checkpointData.size + " checkpointed RDDs")
checkpointData.foreach {
case(time, data) => {