aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-19 15:45:37 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-19 15:45:37 -0800
commit73c8ea84a668f443eb18ce15ba97023da041d808 (patch)
tree15488419d9a2712edcc1efdb0810059c340570c8 /python/pyspark/shuffle.py
parentf9adda9afb63bfdb722be95304f991a3b38a54b3 (diff)
downloadspark-73c8ea84a668f443eb18ce15ba97023da041d808.tar.gz
spark-73c8ea84a668f443eb18ce15ba97023da041d808.tar.bz2
spark-73c8ea84a668f443eb18ce15ba97023da041d808.zip
[SPARK-4384] [PySpark] improve sort spilling
If there some big broadcasts (or other object) in Python worker, the free memory could be used for sorting will be too small, then it will keep spilling small files into disks, finally failed with too many open files. This PR try to delay the spilling until the used memory goes over limit and start to increase since last spilling, it will increase the size of spilling files, improve the stability and performance in this cases. (We also do this in ExternalAggregator). Author: Davies Liu <davies@databricks.com> Closes #3252 from davies/sort and squashes the following commits: 711fb6c [Davies Liu] improve sort spilling
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py11
1 files changed, 10 insertions, 1 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 5931e923c2..10a7ccd502 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -478,13 +478,21 @@ class ExternalSorter(object):
os.makedirs(d)
return os.path.join(d, str(n))
+ def _next_limit(self):
+ """
+ Return the next memory limit. If the memory is not released
+ after spilling, it will dump the data only when the used memory
+ starts to increase.
+ """
+ return max(self.memory_limit, get_used_memory() * 1.05)
+
def sorted(self, iterator, key=None, reverse=False):
"""
Sort the elements in iterator, do external sort when the memory
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch = 100
+ batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -504,6 +512,7 @@ class ExternalSorter(object):
chunks.append(self.serializer.load_stream(open(path)))
current_chunk = []
gc.collect()
+ limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)