aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d2e9de110e..d290c5927e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -325,8 +325,9 @@ extends Serializable with Logging {
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
- val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
- .map(x => (x._1, x._2.getCheckpointData()))
+
+ val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
// Make a copy of the existing checkpoint data
val oldCheckpointData = checkpointData.clone()
@@ -373,7 +374,7 @@ extends Serializable with Logging {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
val rdd = ssc.sc.objectFile[T](data.toString)
// Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointFile = data.toString
+ rdd.checkpointData.cpFile = Some(data.toString)
generatedRDDs += ((time, rdd))
}
}