aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2014-12-29 13:49:45 -0800
committerReynold Xin <rxin@databricks.com>2014-12-29 13:49:45 -0800
commit9bc0df6804f241aff24520d9c6ec54d9b11f5785 (patch)
treedf233fb21efa57191b33083834348e59124ab189 /core
parent02b55de3dce9a1fef806be13e5cefa0f39ea2fcc (diff)
downloadspark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.tar.gz
spark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.tar.bz2
spark-9bc0df6804f241aff24520d9c6ec54d9b11f5785.zip
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception : 4. run query SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100; Error trace java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:863) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136) Author: Yash Datta <Yash.Datta@guavus.com> Closes #3830 from saucam/fix_takeorder and squashes the following commits: 5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala15
1 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f47c2d1fcd..5118e2b911 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1146,15 +1146,20 @@ abstract class RDD[T: ClassTag](
if (num == 0) {
Array.empty
} else {
- mapPartitions { items =>
+ val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord)
+ }
+ if (mapRDDs.partitions.size == 0) {
+ Array.empty
+ } else {
+ mapRDDs.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
}
}