aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b54baa57ec..1d0b16cade 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch, limit = 100, self._next_limit()
+ batch, limit = 100, self.memory_limit
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -497,7 +497,7 @@ class ExternalSorter(object):
break
used_memory = get_used_memory()
- if used_memory > self.memory_limit:
+ if used_memory > limit:
# sort them inplace will save memory
current_chunk.sort(key=key, reverse=reverse)
path = self._get_path(len(chunks))
@@ -513,13 +513,14 @@ class ExternalSorter(object):
chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
+ batch //= 2
limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
elif not chunks:
- batch = min(batch * 2, 10000)
+ batch = min(int(batch * 1.5), 10000)
current_chunk.sort(key=key, reverse=reverse)
if not chunks: