aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMilan Straka <fox@ucw.cz>2015-04-10 13:50:32 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-10 15:20:55 -0700
commit48321b83dbe3fb1f0c3b7d7c4b47c3b1ffa06d68 (patch)
tree2b4c2c9af5309bed9097bac9df373e52eb154b95
parentec3e76f1e47bfe1bdd3a010c004124d3bb17c3d1 (diff)
downloadspark-48321b83dbe3fb1f0c3b7d7c4b47c3b1ffa06d68.tar.gz
spark-48321b83dbe3fb1f0c3b7d7c4b47c3b1ffa06d68.tar.bz2
spark-48321b83dbe3fb1f0c3b7d7c4b47c3b1ffa06d68.zip
[SPARK-5969][PySpark] Fix descending pyspark.rdd.sortByKey.
The samples should always be sorted in ascending order, because bisect.bisect_left is used on it. The reverse order of the result is already achieved in rangePartitioner by reversing the found index. The current implementation also work, but always uses only two partitions -- the first one and the last one (because the bisect_left return returns either "beginning" or "end" for a descending sequence). Author: Milan Straka <fox@ucw.cz> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #4761 from foxik/fix-descending-sort and squashes the following commits: 95896b5 [Milan Straka] Add regression test for SPARK-5969. 5757490 [Milan Straka] Fix descending pyspark.rdd.sortByKey.
-rw-r--r--python/pyspark/rdd.py2
-rw-r--r--python/pyspark/tests.py11
2 files changed, 12 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a1974de261..eb8c6b4253 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -589,7 +589,7 @@ class RDD(object):
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
- samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+ samples = sorted(samples, key=keyfunc)
# we have numPartitions many parts but one of the them has
# an implicit boundary
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index dd8d3b1c53..c10f857c3d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -787,6 +787,17 @@ class RDDTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()
+ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
+ # Regression test for SPARK-5969
+ seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence
+ rdd = self.sc.parallelize(seq)
+ for ascending in [True, False]:
+ sort = rdd.sortByKey(ascending=ascending, numPartitions=5)
+ self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending))
+ sizes = sort.glom().map(len).collect()
+ for size in sizes:
+ self.assertGreater(size, 0)
+
class ProfilerTests(PySparkTestCase):