diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-09-08 11:20:00 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-09-08 11:20:00 -0700 |
commit | 16a73c2473181e03d88001aa3e08e6ffac92eb8b (patch) | |
tree | fc6746e31bc239087505248e0efc1ad58f383f2f /python/pyspark | |
parent | e16a8e7db5a3b1065b14baf89cb723a59b99226b (diff) | |
download | spark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.tar.gz spark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.tar.bz2 spark-16a73c2473181e03d88001aa3e08e6ffac92eb8b.zip |
SPARK-2978. Transformation with MR shuffle semantics
I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits:
4a5332a [Sandy Ryza] Fix Java test
c04b447 [Sandy Ryza] Fix Python doc and add back deleted code
433ad5b [Sandy Ryza] Add Java test
4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes
9b0ba99 [Sandy Ryza] Fix compilation
36e0571 [Sandy Ryza] Fix import ordering
48c12c2 [Sandy Ryza] Add Java version and additional doc
e5381cd [Sandy Ryza] Fix python style warnings
f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/rdd.py | 24 | ||||
-rw-r--r-- | python/pyspark/tests.py | 8 |
2 files changed, 32 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 266090e3ae..5667154cb8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -520,6 +520,30 @@ class RDD(object): raise TypeError return self.union(other) + def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, + ascending=True, keyfunc=lambda x: x): + """ + Repartition the RDD according to the given partitioner and, within each resulting partition, + sort records by their keys. + + >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) + >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, 2) + >>> rdd2.glom().collect() + [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] + """ + if numPartitions is None: + numPartitions = self._defaultReducePartitions() + + spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true") + memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) + serializer = self._jrdd_deserializer + + def sortPartition(iterator): + sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted + return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending))) + + return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) + def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): """ Sorts this RDD, which is assumed to consist of (key, value) pairs. diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 9fbeb36f4f..0bd2a9e6c5 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -545,6 +545,14 @@ class TestRDDFunctions(PySparkTestCase): self.assertEquals(([1, "b"], [5]), rdd.histogram(1)) self.assertRaises(TypeError, lambda: rdd.histogram(2)) + def test_repartitionAndSortWithinPartitions(self): + rdd = self.sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)], 2) + + repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: key % 2) + partitions = repartitioned.glom().collect() + self.assertEquals(partitions[0], [(0, 5), (0, 8), (2, 6)]) + self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)]) + class TestSQL(PySparkTestCase): |