diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-04 22:10:25 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-04 22:10:25 -0800 |
commit | 21a08529768a5073bc5c15b6c2642ceef2acd0d5 (patch) | |
tree | c3b20c818cf197e8933dd134ba6cea6bcae46027 /streaming | |
parent | a69a82be2682148f5d1ebbdede15a47c90eea73d (diff) | |
download | spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.gz spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.bz2 spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.zip |
Refactored RDD checkpointing to minimize extra fields in RDD class.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d2e9de110e..d290c5927e 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -325,8 +325,9 @@ extends Serializable with Logging { logInfo("Updating checkpoint data for time " + currentTime) // Get the checkpointed RDDs from the generated RDDs - val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null) - .map(x => (x._1, x._2.getCheckpointData())) + + val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) // Make a copy of the existing checkpoint data val oldCheckpointData = checkpointData.clone() @@ -373,7 +374,7 @@ extends Serializable with Logging { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") val rdd = ssc.sc.objectFile[T](data.toString) // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointFile = data.toString + rdd.checkpointData.cpFile = Some(data.toString) generatedRDDs += ((time, rdd)) } } |