diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-26 22:47:31 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-26 22:47:31 +0200 |
commit | 617ab6445ea33d8297f0691723fd19bae19228dc (patch) | |
tree | 54eff793a6e82c0e3a319f1b6647057b443c010d /sql/catalyst | |
parent | 362ee93296a0de6342b4339e941e6a11f445c5b2 (diff) | |
download | spark-617ab6445ea33d8297f0691723fd19bae19228dc.tar.gz spark-617ab6445ea33d8297f0691723fd19bae19228dc.tar.bz2 spark-617ab6445ea33d8297f0691723fd19bae19228dc.zip |
[SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windows
## What changes were proposed in this pull request?
The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime.
This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule.
## How was this patch tested?
Added a new test case to `CollapseWindowSuite`
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #17432 from hvanhovell/SPARK-20086.
Diffstat (limited to 'sql/catalyst')
2 files changed, 16 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ee7de86921..dbe3ded4bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] { /** * Collapse Adjacent Window Expression. - * - If the partition specs and order specs are the same, collapse into the parent. + * - If the partition specs and order specs are the same and the window expression are + * independent, collapse into the parent. */ object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 => - w.copy(windowExpressions = we2 ++ we1, child = grandChild) + case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) + if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty => + w1.copy(windowExpressions = we2 ++ we1, child = grandChild) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 3f7d1d9fd9..52054c2f8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest { comparePlans(optimized2, correctAnswer2) } + + test("Don't collapse adjacent windows with dependent columns") { + val query = testRelation + .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1) + .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1) + .analyze + + val expected = query.analyze + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, expected) + } } |