diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-31 10:12:51 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-31 10:12:51 -0800 |
commit | 55b7e2fdffc6c3537da69152a3d02d5be599fa1b (patch) | |
tree | 6ac21f7d330f020b017534cdc4cfd249426015d5 /python/pyspark | |
parent | 50e3b8ec4c8150f1cfc6b92f8871f520adf2cfda (diff) | |
parent | fcd17a1e8ef1d0f106e845f4de99533d61cd8695 (diff) | |
download | spark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.tar.gz spark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.tar.bz2 spark-55b7e2fdffc6c3537da69152a3d02d5be599fa1b.zip |
Merge pull request #289 from tdas/filestream-fix
Bug fixes for file input stream and checkpointing
- Fixed bugs in the file input stream that led the stream to fail due to transient HDFS errors (listing files when a background thread it deleting fails caused errors, etc.)
- Updated Spark's CheckpointRDD and Streaming's CheckpointWriter to use SparkContext.hadoopConfiguration, to allow checkpoints to be written to any HDFS compatible store requiring special configuration.
- Changed the API of SparkContext.setCheckpointDir() - eliminated the unnecessary 'useExisting' parameter. Now SparkContext will always create a unique subdirectory within the user specified checkpoint directory. This is to ensure that previous checkpoint files are not accidentally overwritten.
- Fixed bug where setting checkpoint directory as a relative local path caused the checkpointing to fail.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/context.py | 9 | ||||
-rw-r--r-- | python/pyspark/tests.py | 4 |
2 files changed, 4 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0604f6836c..108f36576a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -320,17 +320,12 @@ class SparkContext(object): self._python_includes.append(filename) sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode - def setCheckpointDir(self, dirName, useExisting=False): + def setCheckpointDir(self, dirName): """ Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster. - - If the directory does not exist, it will be created. If the directory - exists and C{useExisting} is set to true, then the exisiting directory - will be used. Otherwise an exception will be thrown to prevent - accidental overriding of checkpoint files in the existing directory. """ - self._jsc.sc().setCheckpointDir(dirName, useExisting) + self._jsc.sc().setCheckpointDir(dirName) def _getJavaStorageLevel(self, storageLevel): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3987642bf4..7acb6eaf10 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -73,8 +73,8 @@ class TestCheckpoint(PySparkTestCase): time.sleep(1) # 1 second self.assertTrue(flatMappedRDD.isCheckpointed()) self.assertEqual(flatMappedRDD.collect(), result) - self.assertEqual(self.checkpointDir.name, - os.path.dirname(flatMappedRDD.getCheckpointFile())) + self.assertEqual("file:" + self.checkpointDir.name, + os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile()))) def test_checkpoint_and_restore(self): parCollection = self.sc.parallelize([1, 2, 3, 4]) |