aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-04-25 22:32:34 +0200
committerHerman van Hovell <hvanhovell@questtec.nl>2016-04-25 22:32:34 +0200
commit0c47e274ab8c286498fa002e2c92febcb53905c6 (patch)
tree7c90dee19ad986fa2d65f9b5942208a279578ce2
parent3c5e65c339a9b4d5e01375d7f073e444898d34c8 (diff)
downloadspark-0c47e274ab8c286498fa002e2c92febcb53905c6.tar.gz
spark-0c47e274ab8c286498fa002e2c92febcb53905c6.tar.bz2
spark-0c47e274ab8c286498fa002e2c92febcb53905c6.zip
[SPARK-13739][SQL] Push Predicate Through Window
#### What changes were proposed in this pull request? For performance, predicates can be pushed through Window if and only if the following conditions are satisfied: 1. All the expressions are part of window partitioning key. The expressions can be compound. 2. Deterministic #### How was this patch tested? TODO: - [X] DSL needs to be modified for window - [X] more tests will be added. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #11635 from gatorsmile/pushPredicateThroughWindow.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala22
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala43
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala219
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala34
5 files changed, 294 insertions, 33 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 085e95f542..b5d10e4a58 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -175,6 +175,15 @@ package object dsl {
Invoke(function, "apply", returnType, argument :: Nil)
}
+ def windowSpec(
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
+ frame: WindowFrame): WindowSpecDefinition =
+ WindowSpecDefinition(partitionSpec, orderSpec, frame)
+
+ def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression =
+ WindowExpression(windowFunc, windowSpec)
+
implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s: String = sym.name }
// TODO more implicit class for literal?
implicit class DslString(val s: String) extends ImplicitOperators {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 660314f86e..b26ceba228 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -959,6 +959,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
+ // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
+ // pushed beneath must satisfy the following two conditions:
+ // 1. All the expressions are part of window partitioning key. The expressions can be compound.
+ // 2. Deterministic
+ case filter @ Filter(condition, w: Window)
+ if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
+ val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+ cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
+ // This is for ensuring all the partitioning expressions have been converted to alias
+ // in Analyzer. Thus, we do not need to check if the expressions in conditions are
+ // the same as the expressions used in partitioning columns.
+ partitionAttrs.forall(_.isInstanceOf[Attribute])
+ }
+ if (pushDown.nonEmpty) {
+ val pushDownPredicate = pushDown.reduce(And)
+ val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
+ if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow)
+ } else {
+ filter
+ }
+
case filter @ Filter(condition, aggregate: Aggregate) =>
// Find all the aliased expressions in the aggregate list that don't include any actual
// AggregateExpression, and create a map from the alias to the expression
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 52b574c0e6..b5664a5e69 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -267,17 +266,11 @@ class ColumnPruningSuite extends PlanTest {
test("Column pruning on Window with useless aggregate functions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
+ val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
- val originalQuery =
- input.groupBy('a, 'c, 'd)('a, 'c, 'd,
- WindowExpression(
- AggregateExpression(Count('b), Complete, isDistinct = false),
- WindowSpecDefinition( 'a :: Nil,
- SortOrder('b, Ascending) :: Nil,
- UnspecifiedFrame)).as('window)).select('a, 'c)
-
+ val originalQuery = input.groupBy('a, 'c, 'd)('a, 'c, 'd, winExpr.as('window)).select('a, 'c)
val correctAnswer = input.select('a, 'c, 'd).groupBy('a, 'c, 'd)('a, 'c).analyze
-
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, correctAnswer)
@@ -285,25 +278,15 @@ class ColumnPruningSuite extends PlanTest {
test("Column pruning on Window with selected agg expressions") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
+ val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
val originalQuery =
- input.select('a, 'b, 'c, 'd,
- WindowExpression(
- AggregateExpression(Count('b), Complete, isDistinct = false),
- WindowSpecDefinition( 'a :: Nil,
- SortOrder('b, Ascending) :: Nil,
- UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c)
-
+ input.select('a, 'b, 'c, 'd, winExpr.as('window)).where('window > 1).select('a, 'c)
val correctAnswer =
input.select('a, 'b, 'c)
- .window(WindowExpression(
- AggregateExpression(Count('b), Complete, isDistinct = false),
- WindowSpecDefinition( 'a :: Nil,
- SortOrder('b, Ascending) :: Nil,
- UnspecifiedFrame)).as('window) :: Nil,
- 'a :: Nil, 'b.asc :: Nil)
+ .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
.where('window > 1).select('a, 'c).analyze
-
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, correctAnswer)
@@ -311,17 +294,11 @@ class ColumnPruningSuite extends PlanTest {
test("Column pruning on Window in select") {
val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int)
+ val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
- val originalQuery =
- input.select('a, 'b, 'c, 'd,
- WindowExpression(
- AggregateExpression(Count('b), Complete, isDistinct = false),
- WindowSpecDefinition( 'a :: Nil,
- SortOrder('b, Ascending) :: Nil,
- UnspecifiedFrame)).as('window)).select('a, 'c)
-
+ val originalQuery = input.select('a, 'b, 'c, 'd, winExpr.as('window)).select('a, 'c)
val correctAnswer = input.select('a, 'c).analyze
-
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, correctAnswer)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 9174b4e649..e2cc80c564 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -758,4 +758,223 @@ class FilterPushdownSuite extends PlanTest {
val correctedAnswer = agg.copy(child = agg.child.where(a > 1 && b > 2)).analyze
comparePlans(optimized, correctedAnswer)
}
+
+ test("Window: predicate push down -- basic") {
+ val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
+ val correctAnswer = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- predicates with compound predicate using only one column") {
+ val winExpr =
+ windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15)
+ val correctAnswer = testRelation
+ .where('a * 3 > 15).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- multi window expressions with the same window spec") {
+ val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec)
+ val winExpr2 = windowExpr(sum('b), winSpec)
+ val originalQuery = testRelation
+ .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
+
+ val correctAnswer = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil,
+ 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: predicate push down -- multi window specification - 1") {
+ // order by clauses are different between winSpec1 and winSpec2
+ val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec1)
+ val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame)
+ val winExpr2 = windowExpr(count('b), winSpec2)
+ val originalQuery = testRelation
+ .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1)
+
+ val correctAnswer1 = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ val correctAnswer2 = testRelation
+ .where('a > 1).select('a, 'b, 'c)
+ .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window1, 'window2).analyze
+
+ // When Analyzer adding Window operators after grouping the extracted Window Expressions
+ // based on their Partition and Order Specs, the order of Window operators is
+ // non-deterministic. Thus, we have two correct plans
+ val optimizedQuery = Optimize.execute(originalQuery.analyze)
+ try {
+ comparePlans(optimizedQuery, correctAnswer1)
+ } catch {
+ case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
+ }
+ }
+
+ test("Window: predicate push down -- multi window specification - 2") {
+ // partitioning clauses are different between winSpec1 and winSpec2
+ val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr1 = windowExpr(count('b), winSpec1)
+ val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr2 = windowExpr(count('a), winSpec2)
+ val originalQuery = testRelation
+ .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1)
+
+ val correctAnswer1 = testRelation.select('a, 'b, 'c)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
+ .where('b > 1)
+ .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'window1, 'b, 'c, 'window2).analyze
+
+ val correctAnswer2 = testRelation.select('a, 'b, 'c)
+ .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil)
+ .where('b > 1)
+ .select('a, 'window1, 'b, 'c, 'window2).analyze
+
+ val optimizedQuery = Optimize.execute(originalQuery.analyze)
+ // When Analyzer adding Window operators after grouping the extracted Window Expressions
+ // based on their Partition and Order Specs, the order of Window operators is
+ // non-deterministic. Thus, we have two correct plans
+ try {
+ comparePlans(optimizedQuery, correctAnswer1)
+ } catch {
+ case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2)
+ }
+ }
+
+ test("Window: predicate push down -- predicates with multiple partitioning columns") {
+ val winExpr =
+ windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .where('a + 'b > 1).select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ // complex predicates with the same references but the same expressions
+ // Todo: in Analyzer, to enable it, we need to convert the expression in conditions
+ // to the alias that is defined as the same expression
+ ignore("Window: predicate push down -- complex predicate with the same expressions") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
+ .select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- predicates are not from partitioning keys") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is c > 1, but the partitioning key is (a, b).
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1)
+ val correctAnswer = testRelation.select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('c > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- partial compound partition key") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is a > 1, but the partitioning key is (a + b, b)
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+ val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('a > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ test("Window: no predicate push down -- complex predicates containing non partitioning columns") {
+ val winSpec =
+ windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ // No push down: the predicate is a + b > 1, but the partitioning key is b.
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1)
+ val correctAnswer = testRelation
+ .select('a, 'b, 'c)
+ .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil)
+ .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
+
+ // complex predicates with the same references but different expressions
+ test("Window: no predicate push down -- complex predicate with different expressions") {
+ val winSpec = windowSpec(
+ partitionSpec = 'a.attr + 'b.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExpr = windowExpr(count('b), winSpec)
+
+ val winSpecAnalyzed = windowSpec(
+ partitionSpec = '_w0.attr :: Nil,
+ orderSpec = 'b.asc :: Nil,
+ UnspecifiedFrame)
+ val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed)
+
+ // No push down: the predicate is a + b > 1, but the partitioning key is a + b.
+ val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1)
+ val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0"))
+ .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil)
+ .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze
+
+ comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
index 2bcbb1983f..91095af0dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
@@ -354,4 +354,38 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
val df = src.select($"*", max("c").over(winSpec) as "max")
checkAnswer(df, Row(5, Row(0, 3), 5))
}
+
+ test("aggregation and rows between with unbounded + predicate pushdown") {
+ val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
+ df.registerTempTable("window_table")
+ val selectList = Seq($"key", $"value",
+ last("key").over(
+ Window.partitionBy($"value").orderBy($"key").rowsBetween(0, Long.MaxValue)),
+ last("key").over(
+ Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
+ last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 1)))
+
+ checkAnswer(
+ df.select(selectList: _*).where($"value" < "3"),
+ Seq(Row(1, "1", 1, 1, 1), Row(2, "2", 3, 2, 3), Row(3, "2", 3, 3, 3)))
+ }
+
+ test("aggregation and range between with unbounded + predicate pushdown") {
+ val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value")
+ df.registerTempTable("window_table")
+ val selectList = Seq($"key", $"value",
+ last("value").over(
+ Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1)).equalTo("2")
+ .as("last_v"),
+ avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1))
+ .as("avg_key1"),
+ avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, Long.MaxValue))
+ .as("avg_key2"),
+ avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 1))
+ .as("avg_key3"))
+
+ checkAnswer(
+ df.select(selectList: _*).where($"value" < 2),
+ Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0)))
+ }
}