diff options
author | Eric Liang <ekl@google.com> | 2014-09-07 17:57:59 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-09-07 17:57:59 -0700 |
commit | 6754570d83044c4fbaf0d2ac2378a0e081a93629 (patch) | |
tree | d42b454b67cccbebcdc7e04186d924d4906f9b04 /core | |
parent | 3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 (diff) | |
download | spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.gz spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.bz2 spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.zip |
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394
Author: Eric Liang <ekl@google.com>
Closes #2264 from ericl/spark-3394 and squashes the following commits:
c87355b [Eric Liang] refactor
bfb6140 [Eric Liang] change RDD takeOrdered instead
7a51528 [Eric Liang] fix takeordered when limit = 0
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 22 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 7 |
2 files changed, 20 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1cf55e86f6..a9b905b0d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag]( * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { - mapPartitions { items => - // Priority keeps the largest elements, so let's reverse the ordering. - val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) - Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + if (num == 0) { + Array.empty + } else { + mapPartitions { items => + // Priority keeps the largest elements, so let's reverse the ordering. + val queue = new BoundedPriorityQueue[T](num)(ord.reverse) + queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + Iterator.single(queue) + }.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 926d4fecb5..499dcda3da 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sortedLowerK === Array(1, 2, 3, 4, 5)) } + test("takeOrdered with limit 0") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val rdd = sc.makeRDD(nums, 2) + val sortedLowerK = rdd.takeOrdered(0) + assert(sortedLowerK.size === 0) + } + test("takeOrdered with custom ordering") { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) implicit val ord = implicitly[Ordering[Int]].reverse |