diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-08-26 16:57:40 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-08-26 16:57:40 -0700 |
commit | f1e71d4c3ba678fc108effb05cf2d6101dadc0ce (patch) | |
tree | ef5c761a9bf3a75c59b03148985a4a83e64a2c16 /python/pyspark/tests.py | |
parent | c4787a3690a9ed3b8b2c6c294fc4a6915436b6f7 (diff) | |
download | spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.gz spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.bz2 spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.zip |
[SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey()
Using external sort to support sort large datasets in reduce stage.
Author: Davies Liu <davies.liu@gmail.com>
Closes #1978 from davies/sort and squashes the following commits:
bbcd9ba [Davies Liu] check spilled bytes in tests
b125d2f [Davies Liu] add test for external sort in rdd
eae0176 [Davies Liu] choose different disks from different processes and instances
1f075ed [Davies Liu] Merge branch 'master' into sort
eb53ca6 [Davies Liu] Merge branch 'master' into sort
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 42 |
1 files changed, 41 insertions, 1 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1db922f513..3e7040eade 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -30,6 +30,7 @@ import sys import tempfile import time import zipfile +import random if sys.version_info[:2] <= (2, 6): import unittest2 as unittest @@ -37,10 +38,11 @@ else: import unittest +from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.files import SparkFiles from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer -from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger +from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter _have_scipy = False _have_numpy = False @@ -117,6 +119,44 @@ class TestMerger(unittest.TestCase): m._cleanup() +class TestSorter(unittest.TestCase): + def test_in_memory_sort(self): + l = range(1024) + random.shuffle(l) + sorter = ExternalSorter(1024) + self.assertEquals(sorted(l), list(sorter.sorted(l))) + self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) + self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) + self.assertEquals(sorted(l, key=lambda x: -x, reverse=True), + list(sorter.sorted(l, key=lambda x: -x, reverse=True))) + + def test_external_sort(self): + l = range(1024) + random.shuffle(l) + sorter = ExternalSorter(1) + self.assertEquals(sorted(l), list(sorter.sorted(l))) + self.assertGreater(sorter._spilled_bytes, 0) + last = sorter._spilled_bytes + self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True))) + self.assertGreater(sorter._spilled_bytes, last) + last = sorter._spilled_bytes + self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x))) + self.assertGreater(sorter._spilled_bytes, last) + last = sorter._spilled_bytes + self.assertEquals(sorted(l, key=lambda x: -x, reverse=True), + list(sorter.sorted(l, key=lambda x: -x, reverse=True))) + self.assertGreater(sorter._spilled_bytes, last) + + def test_external_sort_in_rdd(self): + conf = SparkConf().set("spark.python.worker.memory", "1m") + sc = SparkContext(conf=conf) + l = range(10240) + random.shuffle(l) + rdd = sc.parallelize(l, 10) + self.assertEquals(sorted(l), rdd.sortBy(lambda x: x).collect()) + sc.stop() + + class SerializationTestCase(unittest.TestCase): def test_namedtuple(self): |