aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 529d16b480..cb15b4b91f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -428,15 +428,19 @@ class SparkContext(object):
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
- # Make sure we distribute data evenly if it's smaller than self.batchSize
- if "__len__" not in dir(c):
- c = list(c) # Make it a list so we can compute its length
- batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
- serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
- serializer.dump_stream(c, tempFile)
- tempFile.close()
- readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
- jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
+ try:
+ # Make sure we distribute data evenly if it's smaller than self.batchSize
+ if "__len__" not in dir(c):
+ c = list(c) # Make it a list so we can compute its length
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
+ serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
+ serializer.dump_stream(c, tempFile)
+ tempFile.close()
+ readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
+ jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
+ finally:
+ # readRDDFromFile eagerily reads the file so we can delete right after.
+ os.unlink(tempFile.name)
return RDD(jrdd, self, serializer)
def pickleFile(self, name, minPartitions=None):