aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala11
2 files changed, 16 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ee7de86921..dbe3ded4bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
/**
* Collapse Adjacent Window Expression.
- * - If the partition specs and order specs are the same, collapse into the parent.
+ * - If the partition specs and order specs are the same and the window expression are
+ * independent, collapse into the parent.
*/
object CollapseWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
- w.copy(windowExpressions = we2 ++ we1, child = grandChild)
+ case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
+ if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty =>
+ w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 3f7d1d9fd9..52054c2f8b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest {
comparePlans(optimized2, correctAnswer2)
}
+
+ test("Don't collapse adjacent windows with dependent columns") {
+ val query = testRelation
+ .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1)
+ .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1)
+ .analyze
+
+ val expected = query.analyze
+ val optimized = Optimize.execute(query.analyze)
+ comparePlans(optimized, expected)
+ }
}