From 8e74fac215e8b9cda7e35111c5116e3669c6eb97 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 11 Dec 2012 15:36:12 -0800 Subject: Made checkpoint data in RDDs optional to further reduce serialized size. --- streaming/src/main/scala/spark/streaming/DStream.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d290c5927e..69fefa21a0 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -372,9 +372,7 @@ extends Serializable with Logging { checkpointData.foreach { case(time, data) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.objectFile[T](data.toString) - // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointData.cpFile = Some(data.toString) + val rdd = ssc.sc.checkpointFile[T](data.toString) generatedRDDs += ((time, rdd)) } } -- cgit v1.2.3