From 617ab6445ea33d8297f0691723fd19bae19228dc Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Sun, 26 Mar 2017 22:47:31 +0200 Subject: [SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windows ## What changes were proposed in this pull request? The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime. This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule. ## How was this patch tested? Added a new test case to `CollapseWindowSuite` Author: Herman van Hovell Closes #17432 from hvanhovell/SPARK-20086. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 +++++--- .../spark/sql/catalyst/optimizer/CollapseWindowSuite.scala | 11 +++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ee7de86921..dbe3ded4bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] { /** * Collapse Adjacent Window Expression. - * - If the partition specs and order specs are the same, collapse into the parent. + * - If the partition specs and order specs are the same and the window expression are + * independent, collapse into the parent. */ object CollapseWindow extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 => - w.copy(windowExpressions = we2 ++ we1, child = grandChild) + case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild)) + if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty => + w1.copy(windowExpressions = we2 ++ we1, child = grandChild) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala index 3f7d1d9fd9..52054c2f8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala @@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest { comparePlans(optimized2, correctAnswer2) } + + test("Don't collapse adjacent windows with dependent columns") { + val query = testRelation + .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1) + .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1) + .analyze + + val expected = query.analyze + val optimized = Optimize.execute(query.analyze) + comparePlans(optimized, expected) + } } -- cgit v1.2.3