diff options
author | Yash Datta <Yash.Datta@guavus.com> | 2014-12-29 13:49:45 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-12-29 13:49:45 -0800 |
commit | 9bc0df6804f241aff24520d9c6ec54d9b11f5785 (patch) | |
tree | df233fb21efa57191b33083834348e59124ab189 /core | |
parent | 02b55de3dce9a1fef806be13e5cefa0f39ea2fcc (diff) | |
download | spark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.tar.gz spark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.tar.bz2 spark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.zip |
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception :
4. run query
SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100;
Error trace
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:863)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136)
Author: Yash Datta <Yash.Datta@guavus.com>
Closes #3830 from saucam/fix_takeorder and squashes the following commits:
5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f47c2d1fcd..5118e2b911 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1146,15 +1146,20 @@ abstract class RDD[T: ClassTag]( if (num == 0) { Array.empty } else { - mapPartitions { items => + val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) + } + if (mapRDDs.partitions.size == 0) { + Array.empty + } else { + mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } } } |