aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-26 16:57:40 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-26 16:57:40 -0700
commitf1e71d4c3ba678fc108effb05cf2d6101dadc0ce (patch)
treeef5c761a9bf3a75c59b03148985a4a83e64a2c16 /python/pyspark/tests.py
parentc4787a3690a9ed3b8b2c6c294fc4a6915436b6f7 (diff)
downloadspark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.gz
spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.bz2
spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.zip
[SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey()
Using external sort to support sort large datasets in reduce stage. Author: Davies Liu <davies.liu@gmail.com> Closes #1978 from davies/sort and squashes the following commits: bbcd9ba [Davies Liu] check spilled bytes in tests b125d2f [Davies Liu] add test for external sort in rdd eae0176 [Davies Liu] choose different disks from different processes and instances 1f075ed [Davies Liu] Merge branch 'master' into sort eb53ca6 [Davies Liu] Merge branch 'master' into sort 644abaf [Davies Liu] add license in LICENSE 19f7873 [Davies Liu] improve tests 55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py42
1 files changed, 41 insertions, 1 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 1db922f513..3e7040eade 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -30,6 +30,7 @@ import sys
import tempfile
import time
import zipfile
+import random
if sys.version_info[:2] <= (2, 6):
import unittest2 as unittest
@@ -37,10 +38,11 @@ else:
import unittest
+from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger
+from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
_have_scipy = False
_have_numpy = False
@@ -117,6 +119,44 @@ class TestMerger(unittest.TestCase):
m._cleanup()
+class TestSorter(unittest.TestCase):
+ def test_in_memory_sort(self):
+ l = range(1024)
+ random.shuffle(l)
+ sorter = ExternalSorter(1024)
+ self.assertEquals(sorted(l), list(sorter.sorted(l)))
+ self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
+ self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertEquals(sorted(l, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
+
+ def test_external_sort(self):
+ l = range(1024)
+ random.shuffle(l)
+ sorter = ExternalSorter(1)
+ self.assertEquals(sorted(l), list(sorter.sorted(l)))
+ self.assertGreater(sorter._spilled_bytes, 0)
+ last = sorter._spilled_bytes
+ self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
+ self.assertGreater(sorter._spilled_bytes, last)
+ last = sorter._spilled_bytes
+ self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertGreater(sorter._spilled_bytes, last)
+ last = sorter._spilled_bytes
+ self.assertEquals(sorted(l, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
+ self.assertGreater(sorter._spilled_bytes, last)
+
+ def test_external_sort_in_rdd(self):
+ conf = SparkConf().set("spark.python.worker.memory", "1m")
+ sc = SparkContext(conf=conf)
+ l = range(10240)
+ random.shuffle(l)
+ rdd = sc.parallelize(l, 10)
+ self.assertEquals(sorted(l), rdd.sortBy(lambda x: x).collect())
+ sc.stop()
+
+
class SerializationTestCase(unittest.TestCase):
def test_namedtuple(self):