From e211f405bcb3cf02c3ae589cf81d9c9dfc70bc03 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 1 Feb 2013 11:48:11 -0800 Subject: Use spark.local.dir for PySpark temp files (SPARK-580). --- python/pyspark/context.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'python/pyspark/context.py') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ba6896dda3..6831f9b7f8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1,8 +1,6 @@ import os -import atexit import shutil import sys -import tempfile from threading import Lock from tempfile import NamedTemporaryFile @@ -94,6 +92,11 @@ class SparkContext(object): SparkFiles._sc = self sys.path.append(SparkFiles.getRootDirectory()) + # Create a temporary directory inside spark.local.dir: + local_dir = self._jvm.spark.Utils.getLocalDir() + self._temp_dir = \ + self._jvm.spark.Utils.createTempDir(local_dir).getAbsolutePath() + @property def defaultParallelism(self): """ @@ -126,8 +129,7 @@ class SparkContext(object): # Calling the Java parallelize() method with an ArrayList is too slow, # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). - tempFile = NamedTemporaryFile(delete=False) - atexit.register(lambda: os.unlink(tempFile.name)) + tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) if self.batchSize != 1: c = batched(c, self.batchSize) for x in c: @@ -247,7 +249,9 @@ class SparkContext(object): def _test(): + import atexit import doctest + import tempfile globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() -- cgit v1.2.3