aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-26 22:47:31 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2017-03-26 22:47:31 +0200
commit617ab6445ea33d8297f0691723fd19bae19228dc (patch)
tree54eff793a6e82c0e3a319f1b6647057b443c010d
parent362ee93296a0de6342b4339e941e6a11f445c5b2 (diff)
downloadspark-617ab6445ea33d8297f0691723fd19bae19228dc.tar.gz
spark-617ab6445ea33d8297f0691723fd19bae19228dc.tar.bz2
spark-617ab6445ea33d8297f0691723fd19bae19228dc.zip
[SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windows
## What changes were proposed in this pull request? The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime. This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule. ## How was this patch tested? Added a new test case to `CollapseWindowSuite` Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17432 from hvanhovell/SPARK-20086.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala11
2 files changed, 16 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ee7de86921..dbe3ded4bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
/**
* Collapse Adjacent Window Expression.
- * - If the partition specs and order specs are the same, collapse into the parent.
+ * - If the partition specs and order specs are the same and the window expression are
+ * independent, collapse into the parent.
*/
object CollapseWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
- w.copy(windowExpressions = we2 ++ we1, child = grandChild)
+ case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
+ if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty =>
+ w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 3f7d1d9fd9..52054c2f8b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest {
comparePlans(optimized2, correctAnswer2)
}
+
+ test("Don't collapse adjacent windows with dependent columns") {
+ val query = testRelation
+ .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1)
+ .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1)
+ .analyze
+
+ val expected = query.analyze
+ val optimized = Optimize.execute(query.analyze)
+ comparePlans(optimized, expected)
+ }
}