diff options
author | Eric Liang <ekl@google.com> | 2014-09-08 16:14:32 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-09-08 16:14:36 -0700 |
commit | 7db53391f1b349d1f49844197b34f94806f5e336 (patch) | |
tree | ea74f73a8da94ce378b5ffc058201b4ef8132330 /sql/core/src | |
parent | 08ce18881e09c6e91db9c410d1d9ce1e5ae63a62 (diff) | |
download | spark-7db53391f1b349d1f49844197b34f94806f5e336.tar.gz spark-7db53391f1b349d1f49844197b34f94806f5e336.tar.bz2 spark-7db53391f1b349d1f49844197b34f94806f5e336.zip |
[SPARK-3349][SQL] Output partitioning of limit should not be inherited from child
This resolves https://issues.apache.org/jira/browse/SPARK-3349
Author: Eric Liang <ekl@google.com>
Closes #2262 from ericl/spark-3349 and squashes the following commits:
3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 4 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 |
2 files changed, 20 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 47bff0c730..cac376608b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} import org.apache.spark.util.MutablePair /** @@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan) private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] override def output = child.output + override def outputPartitioning = SinglePartition /** * A custom implementation modeled after the take function on RDDs but which never runs any job @@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan) case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode { override def output = child.output + override def outputPartitioning = SinglePartition val ordering = new RowOrdering(sortOrder, child.output) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1ac2059377..e8fbc28d0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { (null, null, 6, "F") :: Nil) } + test("SPARK-3349 partitioning after limit") { + sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC") + .limit(2) + .registerTempTable("subset1") + sql("SELECT DISTINCT n FROM lowerCaseData") + .limit(2) + .registerTempTable("subset2") + checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"), + (3, "c", 3) :: + (4, "d", 4) :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"), + (1, "a", 1) :: + (2, "b", 2) :: Nil) + } + test("mixed-case keywords") { checkAnswer( sql( |