aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-10-15 17:36:55 -0700
committerMichael Armbrust <michael@databricks.com>2015-10-15 17:36:55 -0700
commiteb0b4d6e2ddfb765f082d0d88472626336ad2609 (patch)
tree18f542a940c01c8d924f48cbfec89244c5ab01ad /sql
parent6a2359ff1f7ad2233af2c530313d6ec2ecf70d19 (diff)
downloadspark-eb0b4d6e2ddfb765f082d0d88472626336ad2609.tar.gz
spark-eb0b4d6e2ddfb765f082d0d88472626336ad2609.tar.bz2
spark-eb0b4d6e2ddfb765f082d0d88472626336ad2609.zip
[SPARK-11135] [SQL] Exchange incorrectly skips sorts when existing ordering is non-empty subset of required ordering
In Spark SQL, the Exchange planner tries to avoid unnecessary sorts in cases where the data has already been sorted by a superset of the requested sorting columns. For instance, let's say that a query calls for an operator's input to be sorted by `a.asc` and the input happens to already be sorted by `[a.asc, b.asc]`. In this case, we do not need to re-sort the input. The converse, however, is not true: if the query calls for `[a.asc, b.asc]`, then `a.asc` alone will not satisfy the ordering requirements, requiring an additional sort to be planned by Exchange. However, the current Exchange code gets this wrong and incorrectly skips sorting when the existing output ordering is a subset of the required ordering. This is simple to fix, however. This bug was introduced in https://github.com/apache/spark/pull/7458, so it affects 1.5.0+. This patch fixes the bug and significantly improves the unit test coverage of Exchange's sort-planning logic. Author: Josh Rosen <joshrosen@databricks.com> Closes #9140 from JoshRosen/SPARK-11135.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala49
2 files changed, 52 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 289453753f..1d3379a5e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -219,6 +219,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
var children: Seq[SparkPlan] = operator.children
+ assert(requiredChildDistributions.length == children.length)
+ assert(requiredChildOrderings.length == children.length)
// Ensure that the operator's children satisfy their output distribution requirements:
children = children.zip(requiredChildDistributions).map { case (child, distribution) =>
@@ -248,8 +250,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
- val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min
- if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize)) {
+ if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false, child)
} else {
child
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index cafa1d5154..ebdab1c26d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -354,6 +354,55 @@ class PlannerSuite extends SharedSQLContext {
}
}
+ test("EnsureRequirements adds sort when there is no existing ordering") {
+ val orderingA = SortOrder(Literal(1), Ascending)
+ val orderingB = SortOrder(Literal(2), Ascending)
+ assert(orderingA != orderingB)
+ val inputPlan = DummySparkPlan(
+ children = DummySparkPlan(outputOrdering = Seq.empty) :: Nil,
+ requiredChildOrdering = Seq(Seq(orderingB)),
+ requiredChildDistribution = Seq(UnspecifiedDistribution)
+ )
+ val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+ assertDistributionRequirementsAreSatisfied(outputPlan)
+ if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+ fail(s"Sort should have been added:\n$outputPlan")
+ }
+ }
+
+ test("EnsureRequirements skips sort when required ordering is prefix of existing ordering") {
+ val orderingA = SortOrder(Literal(1), Ascending)
+ val orderingB = SortOrder(Literal(2), Ascending)
+ assert(orderingA != orderingB)
+ val inputPlan = DummySparkPlan(
+ children = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)) :: Nil,
+ requiredChildOrdering = Seq(Seq(orderingA)),
+ requiredChildDistribution = Seq(UnspecifiedDistribution)
+ )
+ val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+ assertDistributionRequirementsAreSatisfied(outputPlan)
+ if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.nonEmpty) {
+ fail(s"No sorts should have been added:\n$outputPlan")
+ }
+ }
+
+ // This is a regression test for SPARK-11135
+ test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
+ val orderingA = SortOrder(Literal(1), Ascending)
+ val orderingB = SortOrder(Literal(2), Ascending)
+ assert(orderingA != orderingB)
+ val inputPlan = DummySparkPlan(
+ children = DummySparkPlan(outputOrdering = Seq(orderingA)) :: Nil,
+ requiredChildOrdering = Seq(Seq(orderingA, orderingB)),
+ requiredChildDistribution = Seq(UnspecifiedDistribution)
+ )
+ val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+ assertDistributionRequirementsAreSatisfied(outputPlan)
+ if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+ fail(s"Sort should have been added:\n$outputPlan")
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
}