aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-11 15:36:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-11 15:36:12 -0800
commit8e74fac215e8b9cda7e35111c5116e3669c6eb97 (patch)
treecb460c247109d8028aadc1f7d112d35f4f204ffc /streaming
parentfa28f25619d6712e5f920f498ec03085ea208b4d (diff)
downloadspark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.gz
spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.bz2
spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.zip
Made checkpoint data in RDDs optional to further reduce serialized size.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
1 files changed, 1 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d290c5927e..69fefa21a0 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -372,9 +372,7 @@ extends Serializable with Logging {
checkpointData.foreach {
case(time, data) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.objectFile[T](data.toString)
- // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointData.cpFile = Some(data.toString)
+ val rdd = ssc.sc.checkpointFile[T](data.toString)
generatedRDDs += ((time, rdd))
}
}