aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala17
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala17
-rw-r--r--python/pyspark/context.py11
-rw-r--r--python/pyspark/rdd.py17
4 files changed, 28 insertions, 34 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 087270e46d..b3698ffa44 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -307,16 +307,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
-
- /**
- * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
- * (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
- * This is used to truncate very long lineages. In the current implementation, Spark will save
- * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
- * Hence, it is strongly recommended to use checkpoint() on RDDs when
- * (i) checkpoint() is called before the any job has been executed on this RDD.
- * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
- * require recomputation.
+
+ /**
+ * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
+ * directory set with SparkContext.setCheckpointDir() and all references to its parent
+ * RDDs will be removed. This function must be called before any job has been
+ * executed on this RDD. It is strongly recommended that this RDD is persisted in
+ * memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() = rdd.checkpoint()
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index fa2f14113d..14699961ad 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -357,20 +357,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
}
/**
- * Set the directory under which RDDs are going to be checkpointed. This method will
- * create this directory and will throw an exception of the path already exists (to avoid
- * overwriting existing files may be overwritten). The directory will be deleted on exit
- * if indicated.
+ * Set the directory under which RDDs are going to be checkpointed. The directory must
+ * be a HDFS path if running on a cluster. If the directory does not exist, it will
+ * be created. If the directory exists and useExisting is set to true, then the
+ * exisiting directory will be used. Otherwise an exception will be thrown to
+ * prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean) {
sc.setCheckpointDir(dir, useExisting)
}
/**
- * Set the directory under which RDDs are going to be checkpointed. This method will
- * create this directory and will throw an exception of the path already exists (to avoid
- * overwriting existing files may be overwritten). The directory will be deleted on exit
- * if indicated.
+ * Set the directory under which RDDs are going to be checkpointed. The directory must
+ * be a HDFS path if running on a cluster. If the directory does not exist, it will
+ * be created. If the directory exists, an exception will be thrown to prevent accidental
+ * overriding of checkpoint files.
*/
def setCheckpointDir(dir: String) {
sc.setCheckpointDir(dir)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 8beb8e2ae9..dcbed37270 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -202,9 +202,12 @@ class SparkContext(object):
def setCheckpointDir(self, dirName, useExisting=False):
"""
- Set the directory under which RDDs are going to be checkpointed. This
- method will create this directory and will throw an exception of the
- path already exists (to avoid overwriting existing files may be
- overwritten). The directory will be deleted on exit if indicated.
+ Set the directory under which RDDs are going to be checkpointed. The
+ directory must be a HDFS path if running on a cluster.
+
+ If the directory does not exist, it will be created. If the directory
+ exists and C{useExisting} is set to true, then the exisiting directory
+ will be used. Otherwise an exception will be thrown to prevent
+ accidental overriding of checkpoint files in the existing directory.
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 2a2ff9b271..7b6ab956ee 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -52,18 +52,11 @@ class RDD(object):
def checkpoint(self):
"""
- Mark this RDD for checkpointing. The RDD will be saved to a file inside
- `checkpointDir` (set using setCheckpointDir()) and all references to
- its parent RDDs will be removed. This is used to truncate very long
- lineages. In the current implementation, Spark will save this RDD to
- a file (using saveAsObjectFile()) after the first job using this RDD is
- done. Hence, it is strongly recommended to use checkpoint() on RDDs
- when
-
- (i) checkpoint() is called before the any job has been executed on this
- RDD.
-
- (ii) This RDD has been made to persist in memory. Otherwise saving it
+ Mark this RDD for checkpointing. It will be saved to a file inside the
+ checkpoint directory set with L{SparkContext.setCheckpointDir()} and
+ all references to its parent RDDs will be removed. This function must
+ be called before any job has been executed on this RDD. It is strongly
+ recommended that this RDD is persisted in memory, otherwise saving it
on a file will require recomputation.
"""
self.is_checkpointed = True