diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 266090e3ae..5667154cb8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -520,6 +520,30 @@ class RDD(object): raise TypeError return self.union(other) + def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, + ascending=True, keyfunc=lambda x: x): + """ + Repartition the RDD according to the given partitioner and, within each resulting partition, + sort records by their keys. + + >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) + >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2) + >>> rdd2.glom().collect() + [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] + """ + if numPartitions is None: + numPartitions = self._defaultReducePartitions() + + spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true") + memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) + serializer = self._jrdd_deserializer + + def sortPartition(iterator): + sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted + return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending))) + + return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) + def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): """ Sorts this RDD, which is assumed to consist of (key, value) pairs. |