aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/shuffle.py11
1 files changed, 10 insertions, 1 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 5931e923c2..10a7ccd502 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -478,13 +478,21 @@ class ExternalSorter(object):
os.makedirs(d)
return os.path.join(d, str(n))
+ def _next_limit(self):
+ """
+ Return the next memory limit. If the memory is not released
+ after spilling, it will dump the data only when the used memory
+ starts to increase.
+ """
+ return max(self.memory_limit, get_used_memory() * 1.05)
+
def sorted(self, iterator, key=None, reverse=False):
"""
Sort the elements in iterator, do external sort when the memory
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch = 100
+ batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -504,6 +512,7 @@ class ExternalSorter(object):
chunks.append(self.serializer.load_stream(open(path)))
current_chunk = []
gc.collect()
+ limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)