diff options
author | Yash Datta <Yash.Datta@guavus.com> | 2014-12-29 13:49:45 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-12-29 13:50:34 -0800 |
commit | e81c869677b566dfcabedca89a40aeea7dc16fa9 (patch) | |
tree | 855edb54b8e5c94496b20edf1c850d51a32431f0 | |
parent | 76046664dc9bd830b10c9e4786c211b4407a81e0 (diff) | |
download | spark-e81c869677b566dfcabedca89a40aeea7dc16fa9.tar.gz spark-e81c869677b566dfcabedca89a40aeea7dc16fa9.tar.bz2 spark-e81c869677b566dfcabedca89a40aeea7dc16fa9.zip |
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception :
4. run query
SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100;
Error trace
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:863)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136)
Author: Yash Datta <Yash.Datta@guavus.com>
Closes #3830 from saucam/fix_takeorder and squashes the following commits:
5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
(cherry picked from commit 9bc0df6804f241aff24520d9c6ec54d9b11f5785)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ff6d9465b4..c26425dea0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1132,15 +1132,20 @@ abstract class RDD[T: ClassTag]( if (num == 0) { Array.empty } else { - mapPartitions { items => + val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + } + if (mapRDDs.partitions.size == 0) { + Array.empty + } else { + mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } } |