aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-16 19:15:14 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-20 13:19:19 -0800
commit7ed1bf4b485131d58ea6728e7247b79320aca9e6 (patch)
tree4c9e91c1c997d328bed7c939fdb69f6e8eed516f /python/pyspark/rdd.py
parentfe85a075117a79675971aff0cd020bba446c0233 (diff)
downloadspark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.tar.gz
spark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.tar.bz2
spark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.zip
Add RDD checkpointing to Python API.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d705f0f9e1..9b676cae4a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -49,6 +49,40 @@ class RDD(object):
self._jrdd.cache()
return self
+ def checkpoint(self):
+ """
+ Mark this RDD for checkpointing. The RDD will be saved to a file inside
+ `checkpointDir` (set using setCheckpointDir()) and all references to
+ its parent RDDs will be removed. This is used to truncate very long
+ lineages. In the current implementation, Spark will save this RDD to
+ a file (using saveAsObjectFile()) after the first job using this RDD is
+ done. Hence, it is strongly recommended to use checkpoint() on RDDs
+ when
+
+ (i) checkpoint() is called before the any job has been executed on this
+ RDD.
+
+ (ii) This RDD has been made to persist in memory. Otherwise saving it
+ on a file will require recomputation.
+ """
+ self._jrdd.rdd().checkpoint()
+
+ def isCheckpointed(self):
+ """
+ Return whether this RDD has been checkpointed or not
+ """
+ return self._jrdd.rdd().isCheckpointed()
+
+ def getCheckpointFile(self):
+ """
+ Gets the name of the file to which this RDD was checkpointed
+ """
+ checkpointFile = self._jrdd.rdd().getCheckpointFile()
+ if checkpointFile.isDefined():
+ return checkpointFile.get()
+ else:
+ return None
+
# TODO persist(self, storageLevel)
def map(self, f, preservesPartitioning=False):