aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2014-12-29 13:49:45 -0800
committerReynold Xin <rxin@databricks.com>2014-12-29 13:50:34 -0800
commite81c869677b566dfcabedca89a40aeea7dc16fa9 (patch)
tree855edb54b8e5c94496b20edf1c850d51a32431f0 /core
parent76046664dc9bd830b10c9e4786c211b4407a81e0 (diff)
downloadspark-e81c869677b566dfcabedca89a40aeea7dc16fa9.tar.gz
spark-e81c869677b566dfcabedca89a40aeea7dc16fa9.tar.bz2
spark-e81c869677b566dfcabedca89a40aeea7dc16fa9.zip
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception : 4. run query SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100; Error trace java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:863) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136) Author: Yash Datta <Yash.Datta@guavus.com> Closes #3830 from saucam/fix_takeorder and squashes the following commits: 5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions (cherry picked from commit 9bc0df6804f241aff24520d9c6ec54d9b11f5785) Signed-off-by: Reynold Xin <rxin@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala15
1 files changed, 10 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ff6d9465b4..c26425dea0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1132,15 +1132,20 @@ abstract class RDD[T: ClassTag](
if (num == 0) {
Array.empty
} else {
- mapPartitions { items =>
+ val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord)
+ }
+ if (mapRDDs.partitions.size == 0) {
+ Array.empty
+ } else {
+ mapRDDs.reduce { (queue1, queue2) =>
+ queue1 ++= queue2
+ queue1
+ }.toArray.sorted(ord)
+ }
}
}