aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-11-28 02:56:26 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-28 02:56:26 -0800
commit454b8049916a0353772a0ea5cfe14b62cbd81df4 (patch)
tree95130442a7687844e76f6aca6e3553c8027f396c /sql
parent87141622ee6b11ac177f68f58d0dc5f8b9a9f948 (diff)
downloadspark-454b8049916a0353772a0ea5cfe14b62cbd81df4.tar.gz
spark-454b8049916a0353772a0ea5cfe14b62cbd81df4.tar.bz2
spark-454b8049916a0353772a0ea5cfe14b62cbd81df4.zip
[SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.
## What changes were proposed in this pull request? The `CollapseWindow` optimizer rule changes the order of output attributes. This modifies the output of the plan, which the optimizer cannot do. This also breaks things like `collect()` for which we use a `RowEncoder` that assumes that the output attributes of the executed plan are equal to those outputted by the logical plan. ## How was this patch tested? I have updated an incorrect test in `CollapseWindowSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #16027 from hvanhovell/SPARK-18604.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala13
2 files changed, 9 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6ba8b33b3f..2679e026bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
object CollapseWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
- w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+ w.copy(windowExpressions = we2 ++ we1, child = grandChild)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 797076e55c..3f7d1d9fd9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest {
.window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
.window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
- val optimized = Optimize.execute(query.analyze)
+ val analyzed = query.analyze
+ val optimized = Optimize.execute(analyzed)
+ assert(analyzed.output === optimized.output)
+
val correctAnswer = testRelation.window(Seq(
- avg(b).as('avg_b),
- sum(b).as('sum_b),
- max(a).as('max_a),
- min(a).as('min_a)), partitionSpec1, orderSpec1)
+ min(a).as('min_a),
+ max(a).as('max_a),
+ sum(b).as('sum_b),
+ avg(b).as('avg_b)), partitionSpec1, orderSpec1)
comparePlans(optimized, correctAnswer)
}