aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-24 14:01:13 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-24 14:01:13 -0800
commitd4dfab503a9222b5acf5c4bf69b91c16f298e4aa (patch)
treeb04fff3dd233e23122ac8f1a0072be8bea0961b9 /python
parent9f79fd89dc84cda7ebeb98a0b43c8e982fefa787 (diff)
downloadspark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.tar.gz
spark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.tar.bz2
spark-d4dfab503a9222b5acf5c4bf69b91c16f298e4aa.zip
Fixed Python API for sc.setCheckpointDir. Also other fixes based on Reynold's comments on PR 289.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py9
-rw-r--r--python/pyspark/tests.py4
2 files changed, 4 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 0604f6836c..108f36576a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -320,17 +320,12 @@ class SparkContext(object):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
- def setCheckpointDir(self, dirName, useExisting=False):
+ def setCheckpointDir(self, dirName):
"""
Set the directory under which RDDs are going to be checkpointed. The
directory must be a HDFS path if running on a cluster.
-
- If the directory does not exist, it will be created. If the directory
- exists and C{useExisting} is set to true, then the exisiting directory
- will be used. Otherwise an exception will be thrown to prevent
- accidental overriding of checkpoint files in the existing directory.
"""
- self._jsc.sc().setCheckpointDir(dirName, useExisting)
+ self._jsc.sc().setCheckpointDir(dirName)
def _getJavaStorageLevel(self, storageLevel):
"""
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3987642bf4..7acb6eaf10 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase):
time.sleep(1) # 1 second
self.assertTrue(flatMappedRDD.isCheckpointed())
self.assertEqual(flatMappedRDD.collect(), result)
- self.assertEqual(self.checkpointDir.name,
- os.path.dirname(flatMappedRDD.getCheckpointFile()))
+ self.assertEqual("file:" + self.checkpointDir.name,
+ os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
def test_checkpoint_and_restore(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])