diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 792c129be8..d5048aeed7 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -334,20 +334,22 @@ extends Serializable with Logging { * this method to save custom checkpoint data. */ protected[streaming] def updateCheckpointData(currentTime: Time) { - logInfo("Updating checkpoint data for time " + currentTime) // Get the checkpointed RDDs from the generated RDDs - val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null) - .map(x => (x._1, x._2.getCheckpointData())) - // Make a copy of the existing checkpoint data + val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + + // Make a copy of the existing checkpoint data (checkpointed RDDs) val oldRdds = checkpointData.rdds.clone() - // If the new checkpoint has checkpoints then replace existing with the new one + + // If the new checkpoint data has checkpoints then replace existing with the new one if (newRdds.size > 0) { checkpointData.rdds.clear() checkpointData.rdds ++= newRdds } - // Make dependencies update their checkpoint data + + // Make parent DStreams update their checkpoint data dependencies.foreach(_.updateCheckpointData(currentTime)) // TODO: remove this, this is just for debugging @@ -381,9 +383,7 @@ extends Serializable with Logging { checkpointData.rdds.foreach { case(time, data) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.objectFile[T](data.toString) - // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointFile = data.toString + val rdd = ssc.sc.checkpointFile[T](data.toString) generatedRDDs += ((time, rdd)) } } |