aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-19 23:50:17 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-19 23:50:17 -0800
commit214345ceace634ec9cc83c4c85b233b699e0d219 (patch)
tree0ba6c9f68bbdb64d6505c48bffc5954c066b4989 /core
parentf466ee44bc65779f6bb9c1ba488a87edfc9b28a3 (diff)
downloadspark-214345ceace634ec9cc83c4c85b233b699e0d219.tar.gz
spark-214345ceace634ec9cc83c4c85b233b699e0d219.tar.bz2
spark-214345ceace634ec9cc83c4c85b233b699e0d219.zip
Fixed issue https://spark-project.atlassian.net/browse/STREAMING-29, along with updates to doc comments in SparkContext.checkpoint().
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala17
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala13
3 files changed, 16 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index a9f2e86455..e0d2eabb1d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -549,17 +549,16 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
- * (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
- * This is used to truncate very long lineages. In the current implementation, Spark will save
- * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
- * Hence, it is strongly recommended to use checkpoint() on RDDs when
- * (i) checkpoint() is called before the any job has been executed on this RDD.
- * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
- * require recomputation.
+ * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
+ * directory set with SparkContext.setCheckpointDir() and all references to its parent
+ * RDDs will be removed. This function must be called before any job has been
+ * executed on this RDD. It is strongly recommended that this RDD is persisted in
+ * memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() {
- if (checkpointData.isEmpty) {
+ if (context.checkpointDir.isEmpty) {
+ throw new Exception("Checkpoint directory has not been set in the SparkContext")
+ } else if (checkpointData.isEmpty) {
checkpointData = Some(new RDDCheckpointData(this))
checkpointData.get.markForCheckpoint()
}
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index d845a522e4..18df530b7d 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -63,7 +63,7 @@ extends Logging with Serializable {
}
// Save to file, and reload it as an RDD
- val path = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString
+ val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
val newRDD = new CheckpointRDD[T](rdd.context, path)
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 88cf357ebf..7f3259d982 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -184,7 +184,7 @@ class SparkContext(
private var dagScheduler = new DAGScheduler(taskScheduler)
- private[spark] var checkpointDir: String = null
+ private[spark] var checkpointDir: Option[String] = None
// Methods for creating RDDs
@@ -595,10 +595,11 @@ class SparkContext(
}
/**
- * Set the directory under which RDDs are going to be checkpointed. This method will
- * create this directory and will throw an exception of the path already exists (to avoid
- * overwriting existing files may be overwritten). The directory will be deleted on exit
- * if indicated.
+ * Set the directory under which RDDs are going to be checkpointed. The directory must
+ * be a HDFS path if running on a cluster. If the directory does not exist, it will
+ * be created. If the directory exists and useExisting is set to true, then the
+ * exisiting directory will be used. Otherwise an exception will be thrown to
+ * prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val path = new Path(dir)
@@ -610,7 +611,7 @@ class SparkContext(
fs.mkdirs(path)
}
}
- checkpointDir = dir
+ checkpointDir = Some(dir)
}
/** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */