aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorEric Liang <ekl@google.com>2014-09-07 17:57:59 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-07 17:57:59 -0700
commit6754570d83044c4fbaf0d2ac2378a0e081a93629 (patch)
treed42b454b67cccbebcdc7e04186d924d4906f9b04 /core/src/main/scala
parent3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 (diff)
downloadspark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.gz
spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.bz2
spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.zip
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394 Author: Eric Liang <ekl@google.com> Closes #2264 from ericl/spark-3394 and squashes the following commits: c87355b [Eric Liang] refactor bfb6140 [Eric Liang] change RDD takeOrdered instead 7a51528 [Eric Liang] fix takeordered when limit = 0
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala22
1 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 1cf55e86f6..a9b905b0d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- // Priority keeps the largest elements, so let's reverse the ordering.
- val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
- queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord)
+ if (num == 0) {
+ Array.empty
+ } else {
+ mapPartitions { items =>
+ // Priority keeps the largest elements, so let's reverse the ordering.
+ val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+ queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
}
/**