diff options
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/rdd.py | 2 | ||||
-rw-r--r-- | python/pyspark/tests.py | 11 |
2 files changed, 12 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b18789040..c8e54ed5c6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -595,7 +595,7 @@ class RDD(object): maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() - samples = sorted(samples, reverse=(not ascending), key=keyfunc) + samples = sorted(samples, key=keyfunc) # we have numPartitions many parts but one of the them has # an implicit boundary diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 0bd5d20f78..0e3721b55a 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -829,6 +829,17 @@ class RDDTests(ReusedPySparkTestCase): rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) rdd._jrdd.first() + def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): + # Regression test for SPARK-5969 + seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence + rdd = self.sc.parallelize(seq) + for ascending in [True, False]: + sort = rdd.sortByKey(ascending=ascending, numPartitions=5) + self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending)) + sizes = sort.glom().map(len).collect() + for size in sizes: + self.assertGreater(size, 0) + class ProfilerTests(PySparkTestCase): |