diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-11 15:36:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-11 15:36:12 -0800 |
commit | 8e74fac215e8b9cda7e35111c5116e3669c6eb97 (patch) | |
tree | cb460c247109d8028aadc1f7d112d35f4f204ffc | |
parent | fa28f25619d6712e5f920f498ec03085ea208b4d (diff) | |
download | spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.gz spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.bz2 spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.zip |
Made checkpoint data in RDDs optional to further reduce serialized size.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 19 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 11 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 12 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 4 |
4 files changed, 29 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index efa03d5185..6c04769c82 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -112,7 +112,7 @@ abstract class RDD[T: ClassManifest]( // Variables relating to persistence private var storageLevel: StorageLevel = StorageLevel.NONE - protected[spark] val checkpointData = new RDDCheckpointData(this) + protected[spark] var checkpointData: Option[RDDCheckpointData[T]] = None /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassManifest] = { @@ -149,7 +149,7 @@ abstract class RDD[T: ClassManifest]( def getPreferredLocations(split: Split) = { if (isCheckpointed) { - checkpointData.preferredLocations(split) + checkpointData.get.preferredLocations(split) } else { preferredLocations(split) } @@ -163,7 +163,7 @@ abstract class RDD[T: ClassManifest]( final def iterator(split: Split): Iterator[T] = { if (isCheckpointed) { // ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original - checkpointData.iterator(split) + checkpointData.get.iterator(split) } else if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel) } else { @@ -516,21 +516,24 @@ abstract class RDD[T: ClassManifest]( * require recomputation. */ def checkpoint() { - checkpointData.markForCheckpoint() + if (checkpointData.isEmpty) { + checkpointData = Some(new RDDCheckpointData(this)) + checkpointData.get.markForCheckpoint() + } } /** * Return whether this RDD has been checkpointed or not */ def isCheckpointed(): Boolean = { - checkpointData.isCheckpointed() + if (checkpointData.isDefined) checkpointData.get.isCheckpointed() else false } /** * Gets the name of the file to which this RDD was checkpointed */ def getCheckpointFile(): Option[String] = { - checkpointData.getCheckpointFile() + if (checkpointData.isDefined) checkpointData.get.getCheckpointFile() else None } /** @@ -539,12 +542,12 @@ abstract class RDD[T: ClassManifest]( * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. */ protected[spark] def doCheckpoint() { - checkpointData.doCheckpoint() + if (checkpointData.isDefined) checkpointData.get.doCheckpoint() dependencies.foreach(_.rdd.doCheckpoint()) } /** - * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]] + * Changes the dependencies of this RDD from its original parents to the new RDD * (`newRDD`) created from the checkpoint file. This method must ensure that all references * to the original parent RDDs must be removed to enable the parent RDDs to be garbage * collected. Subclasses of RDD may override this method for implementing their own changing diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 654b1c2eb7..71ed4ef058 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -366,6 +366,17 @@ class SparkContext( .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) } + + protected[spark] def checkpointFile[T: ClassManifest]( + path: String, + minSplits: Int = defaultMinSplits + ): RDD[T] = { + val rdd = objectFile[T](path, minSplits) + rdd.checkpointData = Some(new RDDCheckpointData(rdd)) + rdd.checkpointData.get.cpFile = Some(path) + rdd + } + /** Build the union of a list of RDDs. */ def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 909c55c91c..0bffedb8db 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -57,7 +57,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result) assert(parCollection.dependencies != Nil) assert(parCollection.splits.length === numSplits) - assert(parCollection.splits.toList === parCollection.checkpointData.cpRDDSplits.toList) + assert(parCollection.splits.toList === parCollection.checkpointData.get.cpRDDSplits.toList) assert(parCollection.collect() === result) } @@ -72,7 +72,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result) assert(blockRDD.dependencies != Nil) assert(blockRDD.splits.length === numSplits) - assert(blockRDD.splits.toList === blockRDD.checkpointData.cpRDDSplits.toList) + assert(blockRDD.splits.toList === blockRDD.checkpointData.get.cpRDDSplits.toList) assert(blockRDD.collect() === result) } @@ -84,7 +84,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { } test("UnionRDD") { - def otherRDD = sc.makeRDD(1 to 10, 4) + def otherRDD = sc.makeRDD(1 to 10, 1) // Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed. // Current implementation of UnionRDD has transient reference to parent RDDs, @@ -191,7 +191,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { assert(operatedRDD.dependencies.head.rdd != parentRDD) // Test whether the splits have been changed to the new Hadoop splits - assert(operatedRDD.splits.toList === operatedRDD.checkpointData.cpRDDSplits.toList) + assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.cpRDDSplits.toList) // Test whether the number of splits is same as before assert(operatedRDD.splits.length === numSplits) @@ -289,8 +289,8 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { */ def generateLongLineageRDD(): RDD[Int] = { var rdd = sc.makeRDD(1 to 100, 4) - for (i <- 1 to 20) { - rdd = rdd.map(x => x) + for (i <- 1 to 50) { + rdd = rdd.map(x => x + 1) } rdd } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index d290c5927e..69fefa21a0 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -372,9 +372,7 @@ extends Serializable with Logging { checkpointData.foreach { case(time, data) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.objectFile[T](data.toString) - // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData() - rdd.checkpointData.cpFile = Some(data.toString) + val rdd = ssc.sc.checkpointFile[T](data.toString) generatedRDDs += ((time, rdd)) } } |