aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-31 10:12:51 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-31 10:12:51 -0800
commit55b7e2fdffc6c3537da69152a3d02d5be599fa1b (patch)
tree6ac21f7d330f020b017534cdc4cfd249426015d5 /python
parent50e3b8ec4c8150f1cfc6b92f8871f520adf2cfda (diff)
parentfcd17a1e8ef1d0f106e845f4de99533d61cd8695 (diff)
downloadspark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.tar.gz
spark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.tar.bz2
spark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.zip
Merge pull request #289 from tdas/filestream-fix
Bug fixes for file input stream and checkpointing - Fixed bugs in the file input stream that led the stream to fail due to transient HDFS errors (listing files when a background thread it deleting fails caused errors, etc.) - Updated Spark's CheckpointRDD and Streaming's CheckpointWriter to use SparkContext.hadoopConfiguration, to allow checkpoints to be written to any HDFS compatible store requiring special configuration. - Changed the API of SparkContext.setCheckpointDir() - eliminated the unnecessary 'useExisting' parameter. Now SparkContext will always create a unique subdirectory within the user specified checkpoint directory. This is to ensure that previous checkpoint files are not accidentally overwritten. - Fixed bug where setting checkpoint directory as a relative local path caused the checkpointing to fail.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py9
-rw-r--r--python/pyspark/tests.py4
2 files changed, 4 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 0604f6836c..108f36576a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -320,17 +320,12 @@ class SparkContext(object):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
- def setCheckpointDir(self, dirName, useExisting=False):
+ def setCheckpointDir(self, dirName):
"""
Set the directory under which RDDs are going to be checkpointed. The
directory must be a HDFS path if running on a cluster.
-
- If the directory does not exist, it will be created. If the directory
- exists and C{useExisting} is set to true, then the exisiting directory
- will be used. Otherwise an exception will be thrown to prevent
- accidental overriding of checkpoint files in the existing directory.
"""
- self._jsc.sc().setCheckpointDir(dirName, useExisting)
+ self._jsc.sc().setCheckpointDir(dirName)
def _getJavaStorageLevel(self, storageLevel):
"""
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3987642bf4..7acb6eaf10 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase):
time.sleep(1) # 1 second
self.assertTrue(flatMappedRDD.isCheckpointed())
self.assertEqual(flatMappedRDD.collect(), result)
- self.assertEqual(self.checkpointDir.name,
- os.path.dirname(flatMappedRDD.getCheckpointFile()))
+ self.assertEqual("file:" + self.checkpointDir.name,
+ os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
def test_checkpoint_and_restore(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])