aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@google.com>2014-09-07 17:57:59 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-07 17:57:59 -0700
commit6754570d83044c4fbaf0d2ac2378a0e081a93629 (patch)
treed42b454b67cccbebcdc7e04186d924d4906f9b04
parent3fb57a0ab3d76fda2301dbe9f2f3fa6743b4ed78 (diff)
downloadspark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.gz
spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.tar.bz2
spark-6754570d83044c4fbaf0d2ac2378a0e081a93629.zip
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394 Author: Eric Liang <ekl@google.com> Closes #2264 from ericl/spark-3394 and squashes the following commits: c87355b [Eric Liang] refactor bfb6140 [Eric Liang] change RDD takeOrdered instead 7a51528 [Eric Liang] fix takeordered when limit = 0
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala7
2 files changed, 20 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 1cf55e86f6..a9b905b0d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- // Priority keeps the largest elements, so let's reverse the ordering.
- val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
- queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord)
+ if (num == 0) {
+ Array.empty
+ } else {
+ mapPartitions { items =>
+ // Priority keeps the largest elements, so let's reverse the ordering.
+ val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
+ queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
+ Iterator.single(queue)
+ }.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 926d4fecb5..499dcda3da 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedLowerK === Array(1, 2, 3, 4, 5))
}
+ test("takeOrdered with limit 0") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedLowerK = rdd.takeOrdered(0)
+ assert(sortedLowerK.size === 0)
+ }
+
test("takeOrdered with custom ordering") {
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
implicit val ord = implicitly[Ordering[Int]].reverse