diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-02-01 11:48:11 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-02-01 11:50:27 -0800 |
commit | e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03 (patch) | |
tree | caf8ce5cd59c5d9cfecb5f6d81de4e635c0fa7a1 /python/pyspark/context.py | |
parent | b6a6092177a008cdcd19810d9d3a5715dae927b0 (diff) | |
download | spark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.tar.gz spark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.tar.bz2 spark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.zip |
Use spark.local.dir for PySpark temp files (SPARK-580).
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ba6896dda3..6831f9b7f8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1,8 +1,6 @@ import os -import atexit import shutil import sys -import tempfile from threading import Lock from tempfile import NamedTemporaryFile @@ -94,6 +92,11 @@ class SparkContext(object): SparkFiles._sc = self sys.path.append(SparkFiles.getRootDirectory()) + # Create a temporary directory inside spark.local.dir: + local_dir = self._jvm.spark.Utils.getLocalDir() + self._temp_dir = \ + self._jvm.spark.Utils.createTempDir(local_dir).getAbsolutePath() + @property def defaultParallelism(self): """ @@ -126,8 +129,7 @@ class SparkContext(object): # Calling the Java parallelize() method with an ArrayList is too slow, # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). - tempFile = NamedTemporaryFile(delete=False) - atexit.register(lambda: os.unlink(tempFile.name)) + tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) if self.batchSize != 1: c = batched(c, self.batchSize) for x in c: @@ -247,7 +249,9 @@ class SparkContext(object): def _test(): + import atexit import doctest + import tempfile globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() |