aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-09-08 11:20:00 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-08 11:20:00 -0700
commit16a73c2473181e03d88001aa3e08e6ffac92eb8b (patch)
treefc6746e31bc239087505248e0efc1ad58f383f2f /python/pyspark/rdd.py
parente16a8e7db5a3b1065b14baf89cb723a59b99226b (diff)
downloadspark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.tar.gz
spark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.tar.bz2
spark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.zip
SPARK-2978. Transformation with MR shuffle semantics
I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful. Author: Sandy Ryza <sandy@cloudera.com> Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits: 4a5332a [Sandy Ryza] Fix Java test c04b447 [Sandy Ryza] Fix Python doc and add back deleted code 433ad5b [Sandy Ryza] Add Java test 4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes 9b0ba99 [Sandy Ryza] Fix compilation 36e0571 [Sandy Ryza] Fix import ordering 48c12c2 [Sandy Ryza] Add Java version and additional doc e5381cd [Sandy Ryza] Fix python style warnings f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 266090e3ae..5667154cb8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -520,6 +520,30 @@ class RDD(object):
raise TypeError
return self.union(other)
+ def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash,
+ ascending=True, keyfunc=lambda x: x):
+ """
+ Repartition the RDD according to the given partitioner and, within each resulting partition,
+ sort records by their keys.
+
+ >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
+ >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2)
+ >>> rdd2.glom().collect()
+ [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
+ """
+ if numPartitions is None:
+ numPartitions = self._defaultReducePartitions()
+
+ spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
+ memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
+ serializer = self._jrdd_deserializer
+
+ def sortPartition(iterator):
+ sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+
+ return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
+
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
Sorts this RDD, which is assumed to consist of (key, value) pairs.