aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-02-25 20:43:03 +0800
committerWenchen Fan <wenchen@databricks.com>2016-02-25 20:43:03 +0800
commit3fa6491be66dad690ca5329dd32e7c82037ae8c1 (patch)
treee60e665f5bf5f90ecd0b11a6ca01629f60a93bdc /sql
parent2e44031fafdb8cf486573b98e4faa6b31ffb90a4 (diff)
downloadspark-3fa6491be66dad690ca5329dd32e7c82037ae8c1.tar.gz
spark-3fa6491be66dad690ca5329dd32e7c82037ae8c1.tar.bz2
spark-3fa6491be66dad690ca5329dd32e7c82037ae8c1.zip
[SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)
## What changes were proposed in this pull request? Predicates shouldn't be pushed through project with nondeterministic field(s). See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details. This PR targets master, branch-1.6, and branch-1.5. ## How was this patch tested? A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case. Author: Cheng Lian <lian@databricks.com> Closes #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala27
2 files changed, 11 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2b804976f3..2aeb9575f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -792,7 +792,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+ // SPARK-13473: We can't push the predicate down when the underlying projection output non-
+ // deterministic field(s). Non-deterministic expressions are essentially stateful. This
+ // implies that, for a given input row, the output are determined by the expression's initial
+ // state and all the input rows processed before. In another word, the order of input rows
+ // matters for non-deterministic expressions, while pushing down predicates changes the order.
+ case filter @ Filter(condition, project @ Project(fields, grandChild))
+ if fields.forall(_.deterministic) =>
+
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
val aliasMap = AttributeMap(fields.collect {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7d60862f5a..1292aa0003 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -98,7 +98,7 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
- test("nondeterministic: can't push down filter through project") {
+ test("nondeterministic: can't push down filter with nondeterministic condition through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
@@ -109,36 +109,15 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}
- test("nondeterministic: push down part of filter through project") {
+ test("nondeterministic: can't push down filter through project with nondeterministic field") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
- .where('rand > 5 && 'a > 5)
- .analyze
-
- val optimized = Optimize.execute(originalQuery)
-
- val correctAnswer = testRelation
.where('a > 5)
- .select(Rand(10).as('rand), 'a)
- .where('rand > 5)
- .analyze
-
- comparePlans(optimized, correctAnswer)
- }
-
- test("nondeterministic: push down filter through project") {
- val originalQuery = testRelation
- .select(Rand(10).as('rand), 'a)
- .where('a > 5 && 'a < 10)
.analyze
val optimized = Optimize.execute(originalQuery)
- val correctAnswer = testRelation
- .where('a > 5 && 'a < 10)
- .select(Rand(10).as('rand), 'a)
- .analyze
- comparePlans(optimized, correctAnswer)
+ comparePlans(optimized, originalQuery)
}
test("filters: combines filters") {