diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-16 19:15:14 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-20 13:19:19 -0800 |
commit | 7ed1bf4b485131d58ea6728e7247b79320aca9e6 (patch) | |
tree | 4c9e91c1c997d328bed7c939fdb69f6e8eed516f /python/pyspark/rdd.py | |
parent | fe85a075117a79675971aff0cd020bba446c0233 (diff) | |
download | spark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.tar.gz spark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.tar.bz2 spark-7ed1bf4b485131d58ea6728e7247b79320aca9e6.zip |
Add RDD checkpointing to Python API.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d705f0f9e1..9b676cae4a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -49,6 +49,40 @@ class RDD(object): self._jrdd.cache() return self + def checkpoint(self): + """ + Mark this RDD for checkpointing. The RDD will be saved to a file inside + `checkpointDir` (set using setCheckpointDir()) and all references to + its parent RDDs will be removed. This is used to truncate very long + lineages. In the current implementation, Spark will save this RDD to + a file (using saveAsObjectFile()) after the first job using this RDD is + done. Hence, it is strongly recommended to use checkpoint() on RDDs + when + + (i) checkpoint() is called before the any job has been executed on this + RDD. + + (ii) This RDD has been made to persist in memory. Otherwise saving it + on a file will require recomputation. + """ + self._jrdd.rdd().checkpoint() + + def isCheckpointed(self): + """ + Return whether this RDD has been checkpointed or not + """ + return self._jrdd.rdd().isCheckpointed() + + def getCheckpointFile(self): + """ + Gets the name of the file to which this RDD was checkpointed + """ + checkpointFile = self._jrdd.rdd().getCheckpointFile() + if checkpointFile.isDefined(): + return checkpointFile.get() + else: + return None + # TODO persist(self, storageLevel) def map(self, f, preservesPartitioning=False): |