diff options
author | Eric Liang <ekl@google.com> | 2014-09-07 17:57:59 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-09-07 17:57:59 -0700 |
commit | 6754570d83044c4fbaf0d2ac2378a0e081a93629 (patch) | |
tree | d42b454b67cccbebcdc7e04186d924d4906f9b04 /core/src/main/scala | |
parent | 3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 (diff) | |
download | spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.gz spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.bz2 spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.zip |
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394
Author: Eric Liang <ekl@google.com>
Closes #2264 from ericl/spark-3394 and squashes the following commits:
c87355b [Eric Liang] refactor
bfb6140 [Eric Liang] change RDD takeOrdered instead
7a51528 [Eric Liang] fix takeordered when limit = 0
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1cf55e86f6..a9b905b0d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag]( * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { - mapPartitions { items => - // Priority keeps the largest elements, so let's reverse the ordering. - val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) - Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + if (num == 0) { + Array.empty + } else { + mapPartitions { items => + // Priority keeps the largest elements, so let's reverse the ordering. + val queue = new BoundedPriorityQueue[T](num)(ord.reverse) + queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + Iterator.single(queue) + }.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } /** |