aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-06-18 13:45:58 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-18 13:45:58 -0700
commit9b2002722273f98e193ad6cd54c9626292ab27d1 (patch)
treee78f4f6e47fbcbf7e062942407bcf2c380717b9c
parent31641128b34d6f2aa7cb67324c24dd8b3ed84689 (diff)
downloadspark-9b2002722273f98e193ad6cd54c9626292ab27d1.tar.gz
spark-9b2002722273f98e193ad6cd54c9626292ab27d1.tar.bz2
spark-9b2002722273f98e193ad6cd54c9626292ab27d1.zip
[SPARK-8202] [PYSPARK] fix infinite loop during external sort in PySpark
The batch size during external sort will grow up to max 10000, then shrink down to zero, causing infinite loop. Given the assumption that the items usually have similar size, so we don't need to adjust the batch size after first spill. cc JoshRosen rxin angelini Author: Davies Liu <davies@databricks.com> Closes #6714 from davies/batch_size and squashes the following commits: b170dfb [Davies Liu] update test b9be832 [Davies Liu] Merge branch 'batch_size' of github.com:davies/spark into batch_size 6ade745 [Davies Liu] update test 5c21777 [Davies Liu] Update shuffle.py e746aec [Davies Liu] fix batch size during sort
-rw-r--r--python/pyspark/shuffle.py5
-rw-r--r--python/pyspark/tests.py5
2 files changed, 5 insertions, 5 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 81c420ce16..67752c0d15 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch, limit = 100, self.memory_limit
+ batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -512,9 +512,6 @@ class ExternalSorter(object):
f.close()
chunks.append(load(open(path, 'rb')))
current_chunk = []
- gc.collect()
- batch //= 2
- limit = self._next_limit()
MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 11b402e6df..7826542368 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -179,9 +179,12 @@ class SorterTests(unittest.TestCase):
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
def test_external_sort(self):
+ class CustomizedSorter(ExternalSorter):
+ def _next_limit(self):
+ return self.memory_limit
l = list(range(1024))
random.shuffle(l)
- sorter = ExternalSorter(1)
+ sorter = CustomizedSorter(1)
self.assertEqual(sorted(l), list(sorter.sorted(l)))
self.assertGreater(shuffle.DiskBytesSpilled, 0)
last = shuffle.DiskBytesSpilled