aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-04 22:10:25 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-04 22:10:25 -0800
commit21a08529768a5073bc5c15b6c2642ceef2acd0d5 (patch)
treec3b20c818cf197e8933dd134ba6cea6bcae46027
parenta69a82be2682148f5d1ebbdede15a47c90eea73d (diff)
downloadspark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.gz
spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.tar.bz2
spark-21a08529768a5073bc5c15b6c2642ceef2acd0d5.zip
Refactored RDD checkpointing to minimize extra fields in RDD class.
-rw-r--r--core/src/main/scala/spark/RDD.scala149
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala68
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala10
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala73
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala7
12 files changed, 144 insertions, 194 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index fbfcfbd704..e9bd131e61 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -99,13 +99,7 @@ abstract class RDD[T: ClassManifest](
val partitioner: Option[Partitioner] = None
/** Optionally overridden by subclasses to specify placement preferences. */
- def preferredLocations(split: Split): Seq[String] = {
- if (isCheckpointed) {
- checkpointRDD.preferredLocations(split)
- } else {
- Nil
- }
- }
+ def preferredLocations(split: Split): Seq[String] = Nil
/** The [[spark.SparkContext]] that this RDD was created on. */
def context = sc
@@ -118,6 +112,8 @@ abstract class RDD[T: ClassManifest](
// Variables relating to persistence
private var storageLevel: StorageLevel = StorageLevel.NONE
+ protected[spark] val checkpointData = new RDDCheckpointData(this)
+
/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassManifest] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
@@ -126,17 +122,6 @@ abstract class RDD[T: ClassManifest](
/** Returns the `i` th parent RDD */
protected[spark] def parent[U: ClassManifest](i: Int) = dependencies(i).rdd.asInstanceOf[RDD[U]]
- // Variables relating to checkpointing
- protected val isCheckpointable = true // override to set this to false to avoid checkpointing an RDD
-
- protected var shouldCheckpoint = false // set to true when an RDD is marked for checkpointing
- protected var isCheckpointInProgress = false // set to true when checkpointing is in progress
- protected[spark] var isCheckpointed = false // set to true after checkpointing is completed
-
- protected[spark] var checkpointFile: String = null // set to the checkpoint file after checkpointing is completed
- protected var checkpointRDD: RDD[T] = null // set to the HadoopRDD of the checkpoint file
- protected var checkpointRDDSplits: Seq[Split] = null // set to the splits of the Hadoop RDD
-
// Methods available on all RDDs:
/**
@@ -162,84 +147,15 @@ abstract class RDD[T: ClassManifest](
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
- /**
- * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
- * (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
- * This is used to truncate very long lineages. In the current implementation, Spark will save
- * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
- * Hence, it is strongly recommended to use checkpoint() on RDDs when
- * (i) Checkpoint() is called before the any job has been executed on this RDD.
- * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
- * require recomputation.
- */
- protected[spark] def checkpoint() {
- synchronized {
- if (isCheckpointed || shouldCheckpoint || isCheckpointInProgress) {
- // do nothing
- } else if (isCheckpointable) {
- if (sc.checkpointDir == null) {
- throw new Exception("Checkpoint directory has not been set in the SparkContext.")
- }
- shouldCheckpoint = true
- } else {
- throw new Exception(this + " cannot be checkpointed")
- }
- }
- }
-
- def getCheckpointData(): Any = {
- synchronized {
- checkpointFile
- }
- }
-
- /**
- * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler after a job
- * using this RDD has completed (therefore the RDD has been materialized and
- * potentially stored in memory). In case this RDD is not marked for checkpointing,
- * doCheckpoint() is called recursively on the parent RDDs.
- */
- private[spark] def doCheckpoint() {
- val startCheckpoint = synchronized {
- if (isCheckpointable && shouldCheckpoint && !isCheckpointInProgress) {
- isCheckpointInProgress = true
- true
- } else {
- false
- }
- }
-
- if (startCheckpoint) {
- val rdd = this
- rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
- rdd.saveAsObjectFile(checkpointFile)
- rdd.synchronized {
- rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size)
- rdd.checkpointRDDSplits = rdd.checkpointRDD.splits
- rdd.changeDependencies(rdd.checkpointRDD)
- rdd.shouldCheckpoint = false
- rdd.isCheckpointInProgress = false
- rdd.isCheckpointed = true
- println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
- }
+ def getPreferredLocations(split: Split) = {
+ if (isCheckpointed) {
+ checkpointData.preferredLocations(split)
} else {
- // Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
- dependencies.foreach(_.rdd.doCheckpoint())
+ preferredLocations(split)
}
}
/**
- * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]]
- * (`newRDD`) created from the checkpoint file. This method must ensure that all references
- * to the original parent RDDs must be removed to enable the parent RDDs to be garbage
- * collected. Subclasses of RDD may override this method for implementing their own changing
- * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
- */
- protected def changeDependencies(newRDD: RDD[_]) {
- dependencies_ = List(new OneToOneDependency(newRDD))
- }
-
- /**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
@@ -247,7 +163,7 @@ abstract class RDD[T: ClassManifest](
final def iterator(split: Split): Iterator[T] = {
if (isCheckpointed) {
// ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original
- checkpointRDD.iterator(checkpointRDDSplits(split.index))
+ checkpointData.iterator(split.index)
} else if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel)
} else {
@@ -589,6 +505,55 @@ abstract class RDD[T: ClassManifest](
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
+ /**
+ * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
+ * (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
+ * This is used to truncate very long lineages. In the current implementation, Spark will save
+ * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
+ * Hence, it is strongly recommended to use checkpoint() on RDDs when
+ * (i) checkpoint() is called before the any job has been executed on this RDD.
+ * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
+ * require recomputation.
+ */
+ def checkpoint() {
+ checkpointData.markForCheckpoint()
+ }
+
+ /**
+ * Return whether this RDD has been checkpointed or not
+ */
+ def isCheckpointed(): Boolean = {
+ checkpointData.isCheckpointed()
+ }
+
+ /**
+ * Gets the name of the file to which this RDD was checkpointed
+ */
+ def getCheckpointFile(): Option[String] = {
+ checkpointData.getCheckpointFile()
+ }
+
+ /**
+ * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler
+ * after a job using this RDD has completed (therefore the RDD has been materialized and
+ * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+ */
+ protected[spark] def doCheckpoint() {
+ checkpointData.doCheckpoint()
+ dependencies.foreach(_.rdd.doCheckpoint())
+ }
+
+ /**
+ * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]]
+ * (`newRDD`) created from the checkpoint file. This method must ensure that all references
+ * to the original parent RDDs must be removed to enable the parent RDDs to be garbage
+ * collected. Subclasses of RDD may override this method for implementing their own changing
+ * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea.
+ */
+ protected[spark] def changeDependencies(newRDD: RDD[_]) {
+ dependencies_ = List(new OneToOneDependency(newRDD))
+ }
+
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
synchronized {
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
new file mode 100644
index 0000000000..eb4482acee
--- /dev/null
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -0,0 +1,68 @@
+package spark
+
+import org.apache.hadoop.fs.Path
+
+
+
+private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
+extends Serializable {
+
+ class CheckpointState extends Serializable {
+ var state = 0
+
+ def mark() { if (state == 0) state = 1 }
+ def start() { assert(state == 1); state = 2 }
+ def finish() { assert(state == 2); state = 3 }
+
+ def isMarked() = { state == 1 }
+ def isInProgress = { state == 2 }
+ def isCheckpointed = { state == 3 }
+ }
+
+ val cpState = new CheckpointState()
+ var cpFile: Option[String] = None
+ var cpRDD: Option[RDD[T]] = None
+ var cpRDDSplits: Seq[Split] = Nil
+
+ def markForCheckpoint() = {
+ rdd.synchronized { cpState.mark() }
+ }
+
+ def isCheckpointed() = {
+ rdd.synchronized { cpState.isCheckpointed }
+ }
+
+ def getCheckpointFile() = {
+ rdd.synchronized { cpFile }
+ }
+
+ def doCheckpoint() {
+ rdd.synchronized {
+ if (cpState.isMarked && !cpState.isInProgress) {
+ cpState.start()
+ } else {
+ return
+ }
+ }
+
+ val file = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString
+ rdd.saveAsObjectFile(file)
+ val newRDD = rdd.context.objectFile[T](file, rdd.splits.size)
+
+ rdd.synchronized {
+ rdd.changeDependencies(newRDD)
+ cpFile = Some(file)
+ cpRDD = Some(newRDD)
+ cpRDDSplits = newRDD.splits
+ cpState.finish()
+ }
+ }
+
+ def preferredLocations(split: Split) = {
+ cpRDD.get.preferredLocations(split)
+ }
+
+ def iterator(splitIndex: Int): Iterator[T] = {
+ cpRDD.get.iterator(cpRDDSplits(splitIndex))
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index f4c3f99011..590f9eb738 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -41,12 +41,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
}
}
- override def preferredLocations(split: Split) = {
- if (isCheckpointed) {
- checkpointRDD.preferredLocations(split)
- } else {
- locations_(split.asInstanceOf[BlockRDDSplit].blockId)
- }
- }
+ override def preferredLocations(split: Split) =
+ locations_(split.asInstanceOf[BlockRDDSplit].blockId)
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 458ad38d55..9bfc3f8ca3 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -32,12 +32,8 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def splits = splits_
override def preferredLocations(split: Split) = {
- if (isCheckpointed) {
- checkpointRDD.preferredLocations(split)
- } else {
- val currSplit = split.asInstanceOf[CartesianSplit]
- rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
- }
+ val currSplit = split.asInstanceOf[CartesianSplit]
+ rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
}
override def compute(split: Split) = {
@@ -56,7 +52,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def dependencies = deps_
- override protected def changeDependencies(newRDD: RDD[_]) {
+ override def changeDependencies(newRDD: RDD[_]) {
deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
splits_ = newRDD.splits
rdd1 = null
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 94ef1b56e8..adfecea966 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -112,7 +112,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
map.iterator
}
- override protected def changeDependencies(newRDD: RDD[_]) {
+ override def changeDependencies(newRDD: RDD[_]) {
deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
splits_ = newRDD.splits
rdds = null
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 5b5f72ddeb..90c3b8bfd8 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -48,7 +48,7 @@ class CoalescedRDD[T: ClassManifest](
override def dependencies = deps_
- override protected def changeDependencies(newRDD: RDD[_]) {
+ override def changeDependencies(newRDD: RDD[_]) {
deps_ = List(new OneToOneDependency(newRDD))
splits_ = newRDD.splits
prev = null
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 19ed56d9c0..a12531ea89 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -115,6 +115,4 @@ class HadoopRDD[K, V](
val hadoopSplit = split.asInstanceOf[HadoopSplit]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
}
-
- override val isCheckpointable = false
}
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 2875abb2db..c12df5839e 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -93,6 +93,4 @@ class NewHadoopRDD[K, V](
val theSplit = split.asInstanceOf[NewHadoopSplit]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
}
-
- override val isCheckpointable = false
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 643a174160..30eb8483b6 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -49,15 +49,11 @@ class UnionRDD[T: ClassManifest](
override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator()
- override def preferredLocations(s: Split): Seq[String] = {
- if (isCheckpointed) {
- checkpointRDD.preferredLocations(s)
- } else {
- s.asInstanceOf[UnionSplit[T]].preferredLocations()
- }
- }
+ override def preferredLocations(s: Split): Seq[String] =
+ s.asInstanceOf[UnionSplit[T]].preferredLocations()
+
- override protected def changeDependencies(newRDD: RDD[_]) {
+ override def changeDependencies(newRDD: RDD[_]) {
deps_ = List(new OneToOneDependency(newRDD))
splits_ = newRDD.splits
rdds = null
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 4b2570fa2b..33d35b35d1 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -575,7 +575,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
- val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList
+ val rddPrefs = rdd.getPreferredLocations(rdd.splits(partition)).toList
if (rddPrefs != Nil) {
return rddPrefs
}
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 8622ce92aa..2cafef444c 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -41,7 +41,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
assert(parCollection.dependencies === Nil)
val result = parCollection.collect()
sleep(parCollection) // slightly extra time as loading classes for the first can take some time
- assert(sc.objectFile[Int](parCollection.checkpointFile).collect() === result)
+ assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
assert(parCollection.collect() === result)
}
@@ -54,7 +54,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
blockRDD.checkpoint()
val result = blockRDD.collect()
sleep(blockRDD)
- assert(sc.objectFile[String](blockRDD.checkpointFile).collect() === result)
+ assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result)
assert(blockRDD.dependencies != Nil)
assert(blockRDD.collect() === result)
}
@@ -122,35 +122,6 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
"CoGroupedSplits still holds on to the splits of its parent RDDs")
}
- /**
- * This test forces two ResultTasks of the same job to be launched before and after
- * the checkpointing of job's RDD is completed.
- */
- test("Threading - ResultTasks") {
- val op1 = (parCollection: RDD[Int]) => {
- parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) })
- }
- val op2 = (firstRDD: RDD[(Int, Int)]) => {
- firstRDD.map(x => { println("2nd map running on " + x); Thread.sleep(500); x })
- }
- testThreading(op1, op2)
- }
-
- /**
- * This test forces two ShuffleMapTasks of the same job to be launched before and after
- * the checkpointing of job's RDD is completed.
- */
- test("Threading - ShuffleMapTasks") {
- val op1 = (parCollection: RDD[Int]) => {
- parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) })
- }
- val op2 = (firstRDD: RDD[(Int, Int)]) => {
- firstRDD.groupByKey(2).map(x => { println("2nd map running on " + x); Thread.sleep(500); x })
- }
- testThreading(op1, op2)
- }
-
-
def testCheckpointing[U: ClassManifest](op: (RDD[Int]) => RDD[U], sleepTime: Long = 500) {
val parCollection = sc.makeRDD(1 to 4, 4)
val operatedRDD = op(parCollection)
@@ -159,49 +130,11 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
val result = operatedRDD.collect()
sleep(operatedRDD)
//println(parentRDD + ", " + operatedRDD.dependencies.head.rdd )
- assert(sc.objectFile[U](operatedRDD.checkpointFile).collect() === result)
+ assert(sc.objectFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
assert(operatedRDD.dependencies.head.rdd != parentRDD)
assert(operatedRDD.collect() === result)
}
- def testThreading[U: ClassManifest, V: ClassManifest](op1: (RDD[Int]) => RDD[U], op2: (RDD[U]) => RDD[V]) {
-
- val parCollection = sc.makeRDD(1 to 2, 2)
-
- // This is the RDD that is to be checkpointed
- val firstRDD = op1(parCollection)
- val parentRDD = firstRDD.dependencies.head.rdd
- firstRDD.checkpoint()
-
- // This the RDD that uses firstRDD. This is designed to launch a
- // ShuffleMapTask that uses firstRDD.
- val secondRDD = op2(firstRDD)
-
- // Starting first job, to initiate the checkpointing
- logInfo("\nLaunching 1st job to initiate checkpointing\n")
- firstRDD.collect()
-
- // Checkpointing has started but not completed yet
- Thread.sleep(100)
- assert(firstRDD.dependencies.head.rdd === parentRDD)
-
- // Starting second job; first task of this job will be
- // launched _before_ firstRDD is marked as checkpointed
- // and the second task will be launched _after_ firstRDD
- // is marked as checkpointed
- logInfo("\nLaunching 2nd job that is designed to launch tasks " +
- "before and after checkpointing is complete\n")
- val result = secondRDD.collect()
-
- // Check whether firstRDD has been successfully checkpointed
- assert(firstRDD.dependencies.head.rdd != parentRDD)
-
- logInfo("\nRecomputing 2nd job to verify the results of the previous computation\n")
- // Check whether the result in the previous job was correct or not
- val correctResult = secondRDD.collect()
- assert(result === correctResult)
- }
-
def sleep(rdd: RDD[_]) {
val startTime = System.currentTimeMillis()
val maxWaitTime = 5000
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d2e9de110e..d290c5927e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -325,8 +325,9 @@ extends Serializable with Logging {
logInfo("Updating checkpoint data for time " + currentTime)
// Get the checkpointed RDDs from the generated RDDs
- val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
- .map(x => (x._1, x._2.getCheckpointData()))
+
+ val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
// Make a copy of the existing checkpoint data
val oldCheckpointData = checkpointData.clone()
@@ -373,7 +374,7 @@ extends Serializable with Logging {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
val rdd = ssc.sc.objectFile[T](data.toString)
// Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointFile = data.toString
+ rdd.checkpointData.cpFile = Some(data.toString)
generatedRDDs += ((time, rdd))
}
}