aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py2
-rw-r--r--python/pyspark/tests.py11
2 files changed, 12 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1b18789040..c8e54ed5c6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -595,7 +595,7 @@ class RDD(object):
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
- samples = sorted(samples, reverse=(not ascending), key=keyfunc)
+ samples = sorted(samples, key=keyfunc)
# we have numPartitions many parts but one of the them has
# an implicit boundary
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 0bd5d20f78..0e3721b55a 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -829,6 +829,17 @@ class RDDTests(ReusedPySparkTestCase):
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()
+ def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
+ # Regression test for SPARK-5969
+ seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence
+ rdd = self.sc.parallelize(seq)
+ for ascending in [True, False]:
+ sort = rdd.sortByKey(ascending=ascending, numPartitions=5)
+ self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending))
+ sizes = sort.glom().map(len).collect()
+ for size in sizes:
+ self.assertGreater(size, 0)
+
class ProfilerTests(PySparkTestCase):