aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/shuffle.py5
-rw-r--r--python/pyspark/tests.py5
2 files changed, 5 insertions, 5 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 81c420ce16..67752c0d15 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch, limit = 100, self.memory_limit
+ batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -512,9 +512,6 @@ class ExternalSorter(object):
f.close()
chunks.append(load(open(path, 'rb')))
current_chunk = []
- gc.collect()
- batch //= 2
- limit = self._next_limit()
MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 11b402e6df..7826542368 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -179,9 +179,12 @@ class SorterTests(unittest.TestCase):
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
def test_external_sort(self):
+ class CustomizedSorter(ExternalSorter):
+ def _next_limit(self):
+ return self.memory_limit
l = list(range(1024))
random.shuffle(l)
- sorter = ExternalSorter(1)
+ sorter = CustomizedSorter(1)
self.assertEqual(sorted(l), list(sorter.sorted(l)))
self.assertGreater(shuffle.DiskBytesSpilled, 0)
last = shuffle.DiskBytesSpilled