aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-02-01 11:48:11 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-02-01 11:50:27 -0800
commite211f405bcb3cf02c3ae589cf81d9c9dfc70bc03 (patch)
treecaf8ce5cd59c5d9cfecb5f6d81de4e635c0fa7a1 /python/pyspark/context.py
parentb6a6092177a008cdcd19810d9d3a5715dae927b0 (diff)
downloadspark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.tar.gz
spark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.tar.bz2
spark-e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03.zip
Use spark.local.dir for PySpark temp files (SPARK-580).
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index ba6896dda3..6831f9b7f8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1,8 +1,6 @@
import os
-import atexit
import shutil
import sys
-import tempfile
from threading import Lock
from tempfile import NamedTemporaryFile
@@ -94,6 +92,11 @@ class SparkContext(object):
SparkFiles._sc = self
sys.path.append(SparkFiles.getRootDirectory())
+ # Create a temporary directory inside spark.local.dir:
+ local_dir = self._jvm.spark.Utils.getLocalDir()
+ self._temp_dir = \
+ self._jvm.spark.Utils.createTempDir(local_dir).getAbsolutePath()
+
@property
def defaultParallelism(self):
"""
@@ -126,8 +129,7 @@ class SparkContext(object):
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
- tempFile = NamedTemporaryFile(delete=False)
- atexit.register(lambda: os.unlink(tempFile.name))
+ tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
if self.batchSize != 1:
c = batched(c, self.batchSize)
for x in c:
@@ -247,7 +249,9 @@ class SparkContext(object):
def _test():
+ import atexit
import doctest
+ import tempfile
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()