diff options
author | Eric Liang <ekl@google.com> | 2014-09-08 16:14:32 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-09-08 16:14:36 -0700 |
commit | 7db53391f1b349d1f49844197b34f94806f5e336 (patch) | |
tree | ea74f73a8da94ce378b5ffc058201b4ef8132330 /sql/core/src/main | |
parent | 08ce18881e09c6e91db9c410d1d9ce1e5ae63a62 (diff) | |
download | spark-7db53391f1b349d1f49844197b34f94806f5e336.tar.gz spark-7db53391f1b349d1f49844197b34f94806f5e336.tar.bz2 spark-7db53391f1b349d1f49844197b34f94806f5e336.zip |
[SPARK-3349][SQL] Output partitioning of limit should not be inherited from child
This resolves https://issues.apache.org/jira/browse/SPARK-3349
Author: Eric Liang <ekl@google.com>
Closes #2262 from ericl/spark-3349 and squashes the following commits:
3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 47bff0c730..cac376608b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} import org.apache.spark.util.MutablePair /** @@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan) private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] override def output = child.output + override def outputPartitioning = SinglePartition /** * A custom implementation modeled after the take function on RDDs but which never runs any job @@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan) case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output = child.output + override def outputPartitioning = SinglePartition val ordering = new RowOrdering(sortOrder, child.output) |