aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala')
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala219
1 files changed, 219 insertions, 0 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 9174b4e649..e2cc80c564 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -758,4 +758,223 @@ class FilterPushdownSuite extends PlanTest {
val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze
comparePlans(optimized, correctedAnswer)
}
+
+ test("Window: predicate push down -- basic") {
+ val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
+ val correctAnswer = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- predicates with compound predicate using only one column") {
+ val winExpr =
+ windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15)
+ val correctAnswer = testRelation
+ .where('a * 3 > 15).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- multi window expressions with the same window spec") {
+ val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec)
+ val winExpr2 = windowExpr(sum('b), winSpec)
+ val originalQuery = testRelation
+ .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
+
+ val correctAnswer = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil,
+ 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- multi window specification - 1") {
+ // order by clauses are different between winSpec1 and winSpec2
+ val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec1)
+ val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame)
+ val winExpr2 = windowExpr(count('b), winSpec2)
+ val originalQuery = testRelation
+ .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
+
+ val correctAnswer1 = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ val correctAnswer2 = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ // When Analyzer adding Window operators after grouping the extracted Window Expressions
+ // based on their Partition and Order Specs, the order of Window operators is
+ // non-deterministic. Thus, we have two correct plans
+ val optimizedQuery = Optimize.execute(originalQuery.analyze)
+ try {
+ comparePlans(optimizedQuery, correctAnswer1)
+ } catch {
+ case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
+ }
+ }
+
+ test("Window: predicate push down -- multi window specification - 2") {
+ // partitioning clauses are different between winSpec1 and winSpec2
+ val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec1)
+ val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr2 = windowExpr(count('a), winSpec2)
+ val originalQuery = testRelation
+ .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1)
+
+ val correctAnswer1 = testRelation.select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
+ .where('b > 1)
+ .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'window1, 'b, 'c, 'window2).analyze
+
+ val correctAnswer2 = testRelation.select('a, 'b, 'c)
+ .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
+ .where('b > 1)
+ .select('a, 'window1, 'b, 'c, 'window2).analyze
+
+ val optimizedQuery = Optimize.execute(originalQuery.analyze)
+ // When Analyzer adding Window operators after grouping the extracted Window Expressions
+ // based on their Partition and Order Specs, the order of Window operators is
+ // non-deterministic. Thus, we have two correct plans
+ try {
+ comparePlans(optimizedQuery, correctAnswer1)
+ } catch {
+ case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
+ }
+ }
+
+ test("Window: predicate push down -- predicates with multiple partitioning columns") {
+ val winExpr =
+ windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .where('a + 'b > 1).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ // complex predicates with the same references but the same expressions
+ // Todo: in Analyzer, to enable it, we need to convert the expression in conditions
+ // to the alias that is defined as the same expression
+ ignore("Window: predicate push down -- complex predicate with the same expressions") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- predicates are not from partitioning keys") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is c > 1, but the partitioning key is (a, b).
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1)
+ val correctAnswer = testRelation.select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('c > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- partial compound partition key") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is a > 1, but the partitioning key is (a + b, b)
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+ val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('a > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- complex predicates containing non partitioning columns") {
+ val winSpec =
+ windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is a + b > 1, but the partitioning key is b.
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ // complex predicates with the same references but different expressions
+ test("Window: no predicate push down -- complex predicate with different expressions") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+
+ // No push down: the predicate is a + b > 1, but the partitioning key is a + b.
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1)
+ val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
+ .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
}