aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-07 18:42:24 -0700
committerReynold Xin <rxin@apache.org>2014-09-07 18:42:24 -0700
commite2614038e78f4693fafedeee15b6fdf0ea1be473 (patch)
tree969222dc5c92a5da08a7d119d4c9a01e57446f79 /sql/core
parent39db1bfdab434c867044ad4c70fe93a96fb287ad (diff)
downloadspark-e2614038e78f4693fafedeee15b6fdf0ea1be473.tar.gz
spark-e2614038e78f4693fafedeee15b6fdf0ea1be473.tar.bz2
spark-e2614038e78f4693fafedeee15b6fdf0ea1be473.zip
[SPARK-3408] Fixed Limit operator so it works with sort-based shuffle.
Author: Reynold Xin <rxin@apache.org> Closes #2281 from rxin/sql-limit-sort and squashes the following commits: 1ef7780 [Reynold Xin] [SPARK-3408] Fixed Limit operator so it works with sort-based shuffle.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala19
1 files changed, 14 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4abda21ffe..47bff0c730 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe.TypeTag
+import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
@@ -96,6 +96,9 @@ case class Limit(limit: Int, child: SparkPlan)
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again
+ /** We must copy rows when sort based shuffle is on */
+ private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+
override def output = child.output
/**
@@ -143,9 +146,15 @@ case class Limit(limit: Int, child: SparkPlan)
}
override def execute() = {
- val rdd = child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Boolean, Row]()
- iter.take(limit).map(row => mutablePair.update(false, row))
+ val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter =>
+ iter.take(limit).map(row => (false, row.copy()))
+ }
+ } else {
+ child.execute().mapPartitions { iter =>
+ val mutablePair = new MutablePair[Boolean, Row]()
+ iter.take(limit).map(row => mutablePair.update(false, row))
+ }
}
val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)