aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 792c129be8..d5048aeed7 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -334,20 +334,22 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
-
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
- val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
- .map(x => (x._1, x._2.getCheckpointData()))
- // Make a copy of the existing checkpoint data
+ val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
+
+ // Make a copy of the existing checkpoint data (checkpointed RDDs)
val oldRdds = checkpointData.rdds.clone()
- // If the new checkpoint has checkpoints then replace existing with the new one
+
+ // If the new checkpoint data has checkpoints then replace existing with the new one
if (newRdds.size > 0) {
checkpointData.rdds.clear()
checkpointData.rdds ++= newRdds
}
- // Make dependencies update their checkpoint data
+
+ // Make parent DStreams update their checkpoint data
dependencies.foreach(_.updateCheckpointData(currentTime))
// TODO: remove this, this is just for debugging
@@ -381,9 +383,7 @@ extends Serializable with Logging {
checkpointData.rdds.foreach {
case(time, data) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.objectFile[T](data.toString)
- // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointFile = data.toString
+ val rdd = ssc.sc.checkpointFile[T](data.toString)
generatedRDDs += ((time, rdd))
}
}