aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-11 15:36:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-11 15:36:12 -0800
commit8e74fac215e8b9cda7e35111c5116e3669c6eb97 (patch)
treecb460c247109d8028aadc1f7d112d35f4f204ffc
parentfa28f25619d6712e5f920f498ec03085ea208b4d (diff)
downloadspark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.gz
spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.tar.bz2
spark-8e74fac215e8b9cda7e35111c5116e3669c6eb97.zip
Made checkpoint data in RDDs optional to further reduce serialized size.
-rw-r--r--core/src/main/scala/spark/RDD.scala19
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
4 files changed, 29 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index efa03d5185..6c04769c82 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -112,7 +112,7 @@ abstract class RDD[T: ClassManifest](
// Variables relating to persistence
private var storageLevel: StorageLevel = StorageLevel.NONE
- protected[spark] val checkpointData = new RDDCheckpointData(this)
+ protected[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassManifest] = {
@@ -149,7 +149,7 @@ abstract class RDD[T: ClassManifest](
def getPreferredLocations(split: Split) = {
if (isCheckpointed) {
- checkpointData.preferredLocations(split)
+ checkpointData.get.preferredLocations(split)
} else {
preferredLocations(split)
}
@@ -163,7 +163,7 @@ abstract class RDD[T: ClassManifest](
final def iterator(split: Split): Iterator[T] = {
if (isCheckpointed) {
// ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original
- checkpointData.iterator(split)
+ checkpointData.get.iterator(split)
} else if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel)
} else {
@@ -516,21 +516,24 @@ abstract class RDD[T: ClassManifest](
* require recomputation.
*/
def checkpoint() {
- checkpointData.markForCheckpoint()
+ if (checkpointData.isEmpty) {
+ checkpointData = Some(new RDDCheckpointData(this))
+ checkpointData.get.markForCheckpoint()
+ }
}
/**
* Return whether this RDD has been checkpointed or not
*/
def isCheckpointed(): Boolean = {
- checkpointData.isCheckpointed()
+ if (checkpointData.isDefined) checkpointData.get.isCheckpointed() else false
}
/**
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Option[String] = {
- checkpointData.getCheckpointFile()
+ if (checkpointData.isDefined) checkpointData.get.getCheckpointFile() else None
}
/**
@@ -539,12 +542,12 @@ abstract class RDD[T: ClassManifest](
* potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
*/
protected[spark] def doCheckpoint() {
- checkpointData.doCheckpoint()
+ if (checkpointData.isDefined) checkpointData.get.doCheckpoint()
dependencies.foreach(_.rdd.doCheckpoint())
}
/**
- * Changes the dependencies of this RDD from its original parents to the new [[spark.rdd.HadoopRDD]]
+ * Changes the dependencies of this RDD from its original parents to the new RDD
* (`newRDD`) created from the checkpoint file. This method must ensure that all references
* to the original parent RDDs must be removed to enable the parent RDDs to be garbage
* collected. Subclasses of RDD may override this method for implementing their own changing
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 654b1c2eb7..71ed4ef058 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -366,6 +366,17 @@ class SparkContext(
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
}
+
+ protected[spark] def checkpointFile[T: ClassManifest](
+ path: String,
+ minSplits: Int = defaultMinSplits
+ ): RDD[T] = {
+ val rdd = objectFile[T](path, minSplits)
+ rdd.checkpointData = Some(new RDDCheckpointData(rdd))
+ rdd.checkpointData.get.cpFile = Some(path)
+ rdd
+ }
+
/** Build the union of a list of RDDs. */
def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 909c55c91c..0bffedb8db 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -57,7 +57,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
assert(parCollection.splits.length === numSplits)
- assert(parCollection.splits.toList === parCollection.checkpointData.cpRDDSplits.toList)
+ assert(parCollection.splits.toList === parCollection.checkpointData.get.cpRDDSplits.toList)
assert(parCollection.collect() === result)
}
@@ -72,7 +72,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result)
assert(blockRDD.dependencies != Nil)
assert(blockRDD.splits.length === numSplits)
- assert(blockRDD.splits.toList === blockRDD.checkpointData.cpRDDSplits.toList)
+ assert(blockRDD.splits.toList === blockRDD.checkpointData.get.cpRDDSplits.toList)
assert(blockRDD.collect() === result)
}
@@ -84,7 +84,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
}
test("UnionRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 4)
+ def otherRDD = sc.makeRDD(1 to 10, 1)
// Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed.
// Current implementation of UnionRDD has transient reference to parent RDDs,
@@ -191,7 +191,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
assert(operatedRDD.dependencies.head.rdd != parentRDD)
// Test whether the splits have been changed to the new Hadoop splits
- assert(operatedRDD.splits.toList === operatedRDD.checkpointData.cpRDDSplits.toList)
+ assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.cpRDDSplits.toList)
// Test whether the number of splits is same as before
assert(operatedRDD.splits.length === numSplits)
@@ -289,8 +289,8 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
*/
def generateLongLineageRDD(): RDD[Int] = {
var rdd = sc.makeRDD(1 to 100, 4)
- for (i <- 1 to 20) {
- rdd = rdd.map(x => x)
+ for (i <- 1 to 50) {
+ rdd = rdd.map(x => x + 1)
}
rdd
}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index d290c5927e..69fefa21a0 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -372,9 +372,7 @@ extends Serializable with Logging {
checkpointData.foreach {
case(time, data) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.objectFile[T](data.toString)
- // Set the checkpoint file name to identify this RDD as a checkpointed RDD by updateCheckpointData()
- rdd.checkpointData.cpFile = Some(data.toString)
+ val rdd = ssc.sc.checkpointFile[T](data.toString)
generatedRDDs += ((time, rdd))
}
}