diff options
Diffstat (limited to 'sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala')
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 219 |
1 files changed, 219 insertions, 0 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 9174b4e649..e2cc80c564 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -758,4 +758,223 @@ class FilterPushdownSuite extends PlanTest { val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze comparePlans(optimized, correctedAnswer) } + + test("Window: predicate push down -- basic") { + val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) + val correctAnswer = testRelation + .where('a > 1).select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: predicate push down -- predicates with compound predicate using only one column") { + val winExpr = + windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15) + val correctAnswer = testRelation + .where('a * 3 > 15).select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: predicate push down -- multi window expressions with the same window spec") { + val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr1 = windowExpr(count('b), winSpec) + val winExpr2 = windowExpr(sum('b), winSpec) + val originalQuery = testRelation + .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) + + val correctAnswer = testRelation + .where('a > 1).select('a, 'b, 'c) + .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil, + 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: predicate push down -- multi window specification - 1") { + // order by clauses are different between winSpec1 and winSpec2 + val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr1 = windowExpr(count('b), winSpec1) + val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame) + val winExpr2 = windowExpr(count('b), winSpec2) + val originalQuery = testRelation + .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) + + val correctAnswer1 = testRelation + .where('a > 1).select('a, 'b, 'c) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze + + val correctAnswer2 = testRelation + .where('a > 1).select('a, 'b, 'c) + .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze + + // When Analyzer adding Window operators after grouping the extracted Window Expressions + // based on their Partition and Order Specs, the order of Window operators is + // non-deterministic. Thus, we have two correct plans + val optimizedQuery = Optimize.execute(originalQuery.analyze) + try { + comparePlans(optimizedQuery, correctAnswer1) + } catch { + case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) + } + } + + test("Window: predicate push down -- multi window specification - 2") { + // partitioning clauses are different between winSpec1 and winSpec2 + val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr1 = windowExpr(count('b), winSpec1) + val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr2 = windowExpr(count('a), winSpec2) + val originalQuery = testRelation + .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1) + + val correctAnswer1 = testRelation.select('a, 'b, 'c) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) + .where('b > 1) + .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'window1, 'b, 'c, 'window2).analyze + + val correctAnswer2 = testRelation.select('a, 'b, 'c) + .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) + .where('b > 1) + .select('a, 'window1, 'b, 'c, 'window2).analyze + + val optimizedQuery = Optimize.execute(originalQuery.analyze) + // When Analyzer adding Window operators after grouping the extracted Window Expressions + // based on their Partition and Order Specs, the order of Window operators is + // non-deterministic. Thus, we have two correct plans + try { + comparePlans(optimizedQuery, correctAnswer1) + } catch { + case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) + } + } + + test("Window: predicate push down -- predicates with multiple partitioning columns") { + val winExpr = + windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .where('a + 'b > 1).select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + // complex predicates with the same references but the same expressions + // Todo: in Analyzer, to enable it, we need to convert the expression in conditions + // to the alias that is defined as the same expression + ignore("Window: predicate push down -- complex predicate with the same expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: no predicate push down -- predicates are not from partitioning keys") { + val winSpec = windowSpec( + partitionSpec = 'a.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is c > 1, but the partitioning key is (a, b). + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) + val correctAnswer = testRelation.select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .where('c > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: no predicate push down -- partial compound partition key") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a > 1, but the partitioning key is (a + b, b) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil) + .where('a > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: no predicate push down -- complex predicates containing non partitioning columns") { + val winSpec = + windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a + b > 1, but the partitioning key is b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + // complex predicates with the same references but different expressions + test("Window: no predicate push down -- complex predicate with different expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + // No push down: the predicate is a + b > 1, but the partitioning key is a + b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1) + val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } } |