aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-04 22:10:25 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-04 22:10:25 -0800
commit21a08529768a5073bc5c15b6c2642ceef2acd0d5 (patch)
treec3b20c818cf197e8933dd134ba6cea6bcae46027 /streaming
parenta69a82be2682148f5d1ebbdede15a47c90eea73d (diff)
downloadspark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.gz
spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.bz2
spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.zip
Refactored RDD checkpointing to minimize extra fields in RDD class.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d2e9de110e..d290c5927e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -325,8 +325,9 @@ extends Serializable with Logging {
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
- val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
- .map(x => (x._1, x._2.getCheckpointData()))
+
+ val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
// Make a copy of the existing checkpoint data
val oldCheckpointData = checkpointData.clone()
@@ -373,7 +374,7 @@ extends Serializable with Logging {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
val rdd = ssc.sc.objectFile[T](data.toString)
// Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointFile = data.toString
+ rdd.checkpointData.cpFile = Some(data.toString)
generatedRDDs += ((time, rdd))
}
}