aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 266090e3ae..5667154cb8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -520,6 +520,30 @@ class RDD(object):
raise TypeError
return self.union(other)
+ def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash,
+ ascending=True, keyfunc=lambda x: x):
+ """
+ Repartition the RDD according to the given partitioner and, within each resulting partition,
+ sort records by their keys.
+
+ >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
+ >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
+ >>> rdd2.glom().collect()
+ [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
+ """
+ if numPartitions is None:
+ numPartitions = self._defaultReducePartitions()
+
+ spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
+ memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+ serializer = self._jrdd_deserializer
+
+ def sortPartition(iterator):
+ sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+
+ return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
+
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
Sorts this RDD, which is assumed to consist of (key, value) pairs.