diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-11 15:36:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-11 15:36:12 -0800 |
commit | 8e74fac215e8b9cda7e35111c5116e3669c6eb97 (patch) | |
tree | cb460c247109d8028aadc1f7d112d35f4f204ffc /streaming | |
parent | fa28f25619d6712e5f920f498ec03085ea208b4d (diff) | |
download | spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.gz spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.bz2 spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.zip |
Made checkpoint data in RDDs optional to further reduce serialized size.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 4 |
1 files changed, 1 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d290c5927e..69fefa21a0 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -372,9 +372,7 @@ extends Serializable with Logging { checkpointData.foreach { case(time, data) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.objectFile[T](data.toString) - // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointData.cpFile = Some(data.toString) + val rdd = ssc.sc.checkpointFile[T](data.toString) generatedRDDs += ((time, rdd)) } } |