From 21a08529768a5073bc5c15b6c2642ceef2acd0d5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 4 Dec 2012 22:10:25 -0800 Subject: Refactored RDD checkpointing to minimize extra fields in RDD class. --- core/src/main/scala/spark/RDD.scala | 149 ++++++++------------- core/src/main/scala/spark/RDDCheckpointData.scala | 68 ++++++++++ core/src/main/scala/spark/rdd/BlockRDD.scala | 9 +- core/src/main/scala/spark/rdd/CartesianRDD.scala | 10 +- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 2 +- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 2 +- core/src/main/scala/spark/rdd/HadoopRDD.scala | 2 - core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 2 - core/src/main/scala/spark/rdd/UnionRDD.scala | 12 +- .../main/scala/spark/scheduler/DAGScheduler.scala | 2 +- core/src/test/scala/spark/CheckpointSuite.scala | 73 +--------- .../src/main/scala/spark/streaming/DStream.scala | 7 +- 12 files changed, 144 insertions(+), 194 deletions(-) create mode 100644 core/src/main/scala/spark/RDDCheckpointData.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index fbfcfbd704..e9bd131e61 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -99,13 +99,7 @@ abstract class RDD[T: ClassManifest]( val partitioner: Option[Partitioner] = None /** Optionally overridden by subclasses to specify placement preferences. */ - def preferredLocations(split: Split): Seq[String] = { - if (isCheckpointed) { - checkpointRDD.preferredLocations(split) - } else { - Nil - } - } + def preferredLocations(split: Split): Seq[String] = Nil /** The [[spark.SparkContext]] that this RDD was created on. */ def context = sc @@ -118,6 +112,8 @@ abstract class RDD[T: ClassManifest]( // Variables relating to persistence private var storageLevel: StorageLevel = StorageLevel.NONE + protected[spark] val checkpointData = new RDDCheckpointData(this) + /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassManifest] = { dependencies.head.rdd.asInstanceOf[RDD[U]] @@ -126,17 +122,6 @@ abstract class RDD[T: ClassManifest]( /** Returns the `i` th parent RDD */ protected[spark] def parent[U: ClassManifest](i: Int) = dependencies(i).rdd.asInstanceOf[RDD[U]] - // Variables relating to checkpointing - protected val isCheckpointable = true // override to set this to false to avoid checkpointing an RDD - - protected var shouldCheckpoint = false // set to true when an RDD is marked for checkpointing - protected var isCheckpointInProgress = false // set to true when checkpointing is in progress - protected[spark] var isCheckpointed = false // set to true after checkpointing is completed - - protected[spark] var checkpointFile: String = null // set to the checkpoint file after checkpointing is completed - protected var checkpointRDD: RDD[T] = null // set to the HadoopRDD of the checkpoint file - protected var checkpointRDDSplits: Seq[Split] = null // set to the splits of the Hadoop RDD - // Methods available on all RDDs: /** @@ -162,83 +147,14 @@ abstract class RDD[T: ClassManifest]( /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel - /** - * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir` - * (set using setCheckpointDir()) and all references to its parent RDDs will be removed. - * This is used to truncate very long lineages. In the current implementation, Spark will save - * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done. - * Hence, it is strongly recommended to use checkpoint() on RDDs when - * (i) Checkpoint() is called before the any job has been executed on this RDD. - * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will - * require recomputation. - */ - protected[spark] def checkpoint() { - synchronized { - if (isCheckpointed || shouldCheckpoint || isCheckpointInProgress) { - // do nothing - } else if (isCheckpointable) { - if (sc.checkpointDir == null) { - throw new Exception("Checkpoint directory has not been set in the SparkContext.") - } - shouldCheckpoint = true - } else { - throw new Exception(this + " cannot be checkpointed") - } - } - } - - def getCheckpointData(): Any = { - synchronized { - checkpointFile - } - } - - /** - * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler after a job - * using this RDD has completed (therefore the RDD has been materialized and - * potentially stored in memory). In case this RDD is not marked for checkpointing, - * doCheckpoint() is called recursively on the parent RDDs. - */ - private[spark] def doCheckpoint() { - val startCheckpoint = synchronized { - if (isCheckpointable && shouldCheckpoint && !isCheckpointInProgress) { - isCheckpointInProgress = true - true - } else { - false - } - } - - if (startCheckpoint) { - val rdd = this - rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString - rdd.saveAsObjectFile(checkpointFile) - rdd.synchronized { - rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) - rdd.checkpointRDDSplits = rdd.checkpointRDD.splits - rdd.changeDependencies(rdd.checkpointRDD) - rdd.shouldCheckpoint = false - rdd.isCheckpointInProgress = false - rdd.isCheckpointed = true - println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD) - } + def getPreferredLocations(split: Split) = { + if (isCheckpointed) { + checkpointData.preferredLocations(split) } else { - // Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked - dependencies.foreach(_.rdd.doCheckpoint()) + preferredLocations(split) } } - /** - * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]] - * (`newRDD`) created from the checkpoint file. This method must ensure that all references - * to the original parent RDDs must be removed to enable the parent RDDs to be garbage - * collected. Subclasses of RDD may override this method for implementing their own changing - * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea. - */ - protected def changeDependencies(newRDD: RDD[_]) { - dependencies_ = List(new OneToOneDependency(newRDD)) - } - /** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom @@ -247,7 +163,7 @@ abstract class RDD[T: ClassManifest]( final def iterator(split: Split): Iterator[T] = { if (isCheckpointed) { // ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original - checkpointRDD.iterator(checkpointRDDSplits(split.index)) + checkpointData.iterator(split.index) } else if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel) } else { @@ -589,6 +505,55 @@ abstract class RDD[T: ClassManifest]( sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } + /** + * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir` + * (set using setCheckpointDir()) and all references to its parent RDDs will be removed. + * This is used to truncate very long lineages. In the current implementation, Spark will save + * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done. + * Hence, it is strongly recommended to use checkpoint() on RDDs when + * (i) checkpoint() is called before the any job has been executed on this RDD. + * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will + * require recomputation. + */ + def checkpoint() { + checkpointData.markForCheckpoint() + } + + /** + * Return whether this RDD has been checkpointed or not + */ + def isCheckpointed(): Boolean = { + checkpointData.isCheckpointed() + } + + /** + * Gets the name of the file to which this RDD was checkpointed + */ + def getCheckpointFile(): Option[String] = { + checkpointData.getCheckpointFile() + } + + /** + * Performs the checkpointing of this RDD by saving this . It is called by the DAGScheduler + * after a job using this RDD has completed (therefore the RDD has been materialized and + * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. + */ + protected[spark] def doCheckpoint() { + checkpointData.doCheckpoint() + dependencies.foreach(_.rdd.doCheckpoint()) + } + + /** + * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]] + * (`newRDD`) created from the checkpoint file. This method must ensure that all references + * to the original parent RDDs must be removed to enable the parent RDDs to be garbage + * collected. Subclasses of RDD may override this method for implementing their own changing + * logic. See [[spark.rdd.UnionRDD]] and [[spark.rdd.ShuffledRDD]] to get a better idea. + */ + protected[spark] def changeDependencies(newRDD: RDD[_]) { + dependencies_ = List(new OneToOneDependency(newRDD)) + } + @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { synchronized { diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala new file mode 100644 index 0000000000..eb4482acee --- /dev/null +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -0,0 +1,68 @@ +package spark + +import org.apache.hadoop.fs.Path + + + +private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) +extends Serializable { + + class CheckpointState extends Serializable { + var state = 0 + + def mark() { if (state == 0) state = 1 } + def start() { assert(state == 1); state = 2 } + def finish() { assert(state == 2); state = 3 } + + def isMarked() = { state == 1 } + def isInProgress = { state == 2 } + def isCheckpointed = { state == 3 } + } + + val cpState = new CheckpointState() + var cpFile: Option[String] = None + var cpRDD: Option[RDD[T]] = None + var cpRDDSplits: Seq[Split] = Nil + + def markForCheckpoint() = { + rdd.synchronized { cpState.mark() } + } + + def isCheckpointed() = { + rdd.synchronized { cpState.isCheckpointed } + } + + def getCheckpointFile() = { + rdd.synchronized { cpFile } + } + + def doCheckpoint() { + rdd.synchronized { + if (cpState.isMarked && !cpState.isInProgress) { + cpState.start() + } else { + return + } + } + + val file = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString + rdd.saveAsObjectFile(file) + val newRDD = rdd.context.objectFile[T](file, rdd.splits.size) + + rdd.synchronized { + rdd.changeDependencies(newRDD) + cpFile = Some(file) + cpRDD = Some(newRDD) + cpRDDSplits = newRDD.splits + cpState.finish() + } + } + + def preferredLocations(split: Split) = { + cpRDD.get.preferredLocations(split) + } + + def iterator(splitIndex: Int): Iterator[T] = { + cpRDD.get.iterator(cpRDDSplits(splitIndex)) + } +} diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index f4c3f99011..590f9eb738 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -41,12 +41,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St } } - override def preferredLocations(split: Split) = { - if (isCheckpointed) { - checkpointRDD.preferredLocations(split) - } else { - locations_(split.asInstanceOf[BlockRDDSplit].blockId) - } - } + override def preferredLocations(split: Split) = + locations_(split.asInstanceOf[BlockRDDSplit].blockId) } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 458ad38d55..9bfc3f8ca3 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -32,12 +32,8 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( override def splits = splits_ override def preferredLocations(split: Split) = { - if (isCheckpointed) { - checkpointRDD.preferredLocations(split) - } else { - val currSplit = split.asInstanceOf[CartesianSplit] - rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) - } + val currSplit = split.asInstanceOf[CartesianSplit] + rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2) } override def compute(split: Split) = { @@ -56,7 +52,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( override def dependencies = deps_ - override protected def changeDependencies(newRDD: RDD[_]) { + override def changeDependencies(newRDD: RDD[_]) { deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) splits_ = newRDD.splits rdd1 = null diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 94ef1b56e8..adfecea966 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -112,7 +112,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) map.iterator } - override protected def changeDependencies(newRDD: RDD[_]) { + override def changeDependencies(newRDD: RDD[_]) { deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]])) splits_ = newRDD.splits rdds = null diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 5b5f72ddeb..90c3b8bfd8 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -48,7 +48,7 @@ class CoalescedRDD[T: ClassManifest]( override def dependencies = deps_ - override protected def changeDependencies(newRDD: RDD[_]) { + override def changeDependencies(newRDD: RDD[_]) { deps_ = List(new OneToOneDependency(newRDD)) splits_ = newRDD.splits prev = null diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 19ed56d9c0..a12531ea89 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -115,6 +115,4 @@ class HadoopRDD[K, V]( val hadoopSplit = split.asInstanceOf[HadoopSplit] hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost") } - - override val isCheckpointable = false } diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 2875abb2db..c12df5839e 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -93,6 +93,4 @@ class NewHadoopRDD[K, V]( val theSplit = split.asInstanceOf[NewHadoopSplit] theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost") } - - override val isCheckpointable = false } diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 643a174160..30eb8483b6 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -49,15 +49,11 @@ class UnionRDD[T: ClassManifest]( override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator() - override def preferredLocations(s: Split): Seq[String] = { - if (isCheckpointed) { - checkpointRDD.preferredLocations(s) - } else { - s.asInstanceOf[UnionSplit[T]].preferredLocations() - } - } + override def preferredLocations(s: Split): Seq[String] = + s.asInstanceOf[UnionSplit[T]].preferredLocations() + - override protected def changeDependencies(newRDD: RDD[_]) { + override def changeDependencies(newRDD: RDD[_]) { deps_ = List(new OneToOneDependency(newRDD)) splits_ = newRDD.splits rdds = null diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 4b2570fa2b..33d35b35d1 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -575,7 +575,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those - val rddPrefs = rdd.preferredLocations(rdd.splits(partition)).toList + val rddPrefs = rdd.getPreferredLocations(rdd.splits(partition)).toList if (rddPrefs != Nil) { return rddPrefs } diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 8622ce92aa..2cafef444c 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -41,7 +41,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(parCollection.dependencies === Nil) val result = parCollection.collect() sleep(parCollection) // slightly extra time as loading classes for the first can take some time - assert(sc.objectFile[Int](parCollection.checkpointFile).collect() === result) + assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result) assert(parCollection.dependencies != Nil) assert(parCollection.collect() === result) } @@ -54,7 +54,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { blockRDD.checkpoint() val result = blockRDD.collect() sleep(blockRDD) - assert(sc.objectFile[String](blockRDD.checkpointFile).collect() === result) + assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result) assert(blockRDD.dependencies != Nil) assert(blockRDD.collect() === result) } @@ -122,35 +122,6 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { "CoGroupedSplits still holds on to the splits of its parent RDDs") } - /** - * This test forces two ResultTasks of the same job to be launched before and after - * the checkpointing of job's RDD is completed. - */ - test("Threading - ResultTasks") { - val op1 = (parCollection: RDD[Int]) => { - parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) }) - } - val op2 = (firstRDD: RDD[(Int, Int)]) => { - firstRDD.map(x => { println("2nd map running on " + x); Thread.sleep(500); x }) - } - testThreading(op1, op2) - } - - /** - * This test forces two ShuffleMapTasks of the same job to be launched before and after - * the checkpointing of job's RDD is completed. - */ - test("Threading - ShuffleMapTasks") { - val op1 = (parCollection: RDD[Int]) => { - parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) }) - } - val op2 = (firstRDD: RDD[(Int, Int)]) => { - firstRDD.groupByKey(2).map(x => { println("2nd map running on " + x); Thread.sleep(500); x }) - } - testThreading(op1, op2) - } - - def testCheckpointing[U: ClassManifest](op: (RDD[Int]) => RDD[U], sleepTime: Long = 500) { val parCollection = sc.makeRDD(1 to 4, 4) val operatedRDD = op(parCollection) @@ -159,49 +130,11 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { val result = operatedRDD.collect() sleep(operatedRDD) //println(parentRDD + ", " + operatedRDD.dependencies.head.rdd ) - assert(sc.objectFile[U](operatedRDD.checkpointFile).collect() === result) + assert(sc.objectFile[U](operatedRDD.getCheckpointFile.get).collect() === result) assert(operatedRDD.dependencies.head.rdd != parentRDD) assert(operatedRDD.collect() === result) } - def testThreading[U: ClassManifest, V: ClassManifest](op1: (RDD[Int]) => RDD[U], op2: (RDD[U]) => RDD[V]) { - - val parCollection = sc.makeRDD(1 to 2, 2) - - // This is the RDD that is to be checkpointed - val firstRDD = op1(parCollection) - val parentRDD = firstRDD.dependencies.head.rdd - firstRDD.checkpoint() - - // This the RDD that uses firstRDD. This is designed to launch a - // ShuffleMapTask that uses firstRDD. - val secondRDD = op2(firstRDD) - - // Starting first job, to initiate the checkpointing - logInfo("\nLaunching 1st job to initiate checkpointing\n") - firstRDD.collect() - - // Checkpointing has started but not completed yet - Thread.sleep(100) - assert(firstRDD.dependencies.head.rdd === parentRDD) - - // Starting second job; first task of this job will be - // launched _before_ firstRDD is marked as checkpointed - // and the second task will be launched _after_ firstRDD - // is marked as checkpointed - logInfo("\nLaunching 2nd job that is designed to launch tasks " + - "before and after checkpointing is complete\n") - val result = secondRDD.collect() - - // Check whether firstRDD has been successfully checkpointed - assert(firstRDD.dependencies.head.rdd != parentRDD) - - logInfo("\nRecomputing 2nd job to verify the results of the previous computation\n") - // Check whether the result in the previous job was correct or not - val correctResult = secondRDD.collect() - assert(result === correctResult) - } - def sleep(rdd: RDD[_]) { val startTime = System.currentTimeMillis() val maxWaitTime = 5000 diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d2e9de110e..d290c5927e 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -325,8 +325,9 @@ extends Serializable with Logging { logInfo("Updating checkpoint data for time " + currentTime) // Get the checkpointed RDDs from the generated RDDs - val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null) - .map(x => (x._1, x._2.getCheckpointData())) + + val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) // Make a copy of the existing checkpoint data val oldCheckpointData = checkpointData.clone() @@ -373,7 +374,7 @@ extends Serializable with Logging { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") val rdd = ssc.sc.objectFile[T](data.toString) // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointFile = data.toString + rdd.checkpointData.cpFile = Some(data.toString) generatedRDDs += ((time, rdd)) } } -- cgit v1.2.3