aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-26 16:57:40 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-26 16:57:40 -0700
commitf1e71d4c3ba678fc108effb05cf2d6101dadc0ce (patch)
treeef5c761a9bf3a75c59b03148985a4a83e64a2c16 /python/pyspark/rdd.py
parentc4787a3690a9ed3b8b2c6c294fc4a6915436b6f7 (diff)
downloadspark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.gz
spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.tar.bz2
spark-f1e71d4c3ba678fc108effb05cf2d6101dadc0ce.zip
[SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey()
Using external sort to support sort large datasets in reduce stage. Author: Davies Liu <davies.liu@gmail.com> Closes #1978 from davies/sort and squashes the following commits: bbcd9ba [Davies Liu] check spilled bytes in tests b125d2f [Davies Liu] add test for external sort in rdd eae0176 [Davies Liu] choose different disks from different processes and instances 1f075ed [Davies Liu] Merge branch 'master' into sort eb53ca6 [Davies Liu] Merge branch 'master' into sort 644abaf [Davies Liu] add license in LICENSE 19f7873 [Davies Liu] improve tests 55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 3a2e7649e6..31919741e9 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -44,7 +44,7 @@ from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
- get_used_memory
+ get_used_memory, ExternalSorter
from py4j.java_collections import ListConverter, MapConverter
@@ -605,8 +605,13 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
+ spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true')
+ memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+ serializer = self._jrdd_deserializer
+
def sortPartition(iterator):
- return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending))
+ sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
if numPartitions == 1:
if self.getNumPartitions() > 1: