aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-22 11:45:51 -0700
committerYin Huai <yhuai@databricks.com>2015-07-22 11:45:51 -0700
commit76520955fddbda87a5c53d0a394dedc91dce67e8 (patch)
tree7f007f4d16089a963b13ce68d69c738648d3152d
parentb55a36bc30a628d76baa721d38789fc219eccc27 (diff)
downloadspark-76520955fddbda87a5c53d0a394dedc91dce67e8.tar.gz
spark-76520955fddbda87a5c53d0a394dedc91dce67e8.tar.bz2
spark-76520955fddbda87a5c53d0a394dedc91dce67e8.zip
[SPARK-9082] [SQL] Filter using non-deterministic expressions should not be pushed down
Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7446 from cloud-fan/filter and squashes the following commits: 330021e [Wenchen Fan] add exists to tree node 2cab68c [Wenchen Fan] more enhance 949be07 [Wenchen Fan] push down part of predicate if possible 3912f84 [Wenchen Fan] address comments 8ce15ca [Wenchen Fan] fix bug 557158e [Wenchen Fan] Filter using non-deterministic expressions should not be pushed down
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala50
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala45
2 files changed, 84 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e42f0b9a24..d2db3dd3d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -541,20 +541,50 @@ object SimplifyFilters extends Rule[LogicalPlan] {
*
* This heuristic is valid assuming the expression evaluation cost is minimal.
*/
-object PushPredicateThroughProject extends Rule[LogicalPlan] {
+object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
- val sourceAliases = fields.collect { case a @ Alias(c, _) =>
- (a.toAttribute: Attribute) -> c
- }.toMap
- project.copy(child = filter.copy(
- replaceAlias(condition, sourceAliases),
- grandChild))
+ // Create a map of Aliases to their values from the child projection.
+ // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
+ val aliasMap = AttributeMap(fields.collect {
+ case a: Alias => (a.toAttribute, a.child)
+ })
+
+ // Split the condition into small conditions by `And`, so that we can push down part of this
+ // condition without nondeterministic expressions.
+ val andConditions = splitConjunctivePredicates(condition)
+ val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))
+
+ // If there is no nondeterministic conditions, push down the whole condition.
+ if (nondeterministicConditions.isEmpty) {
+ project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
+ } else {
+ // If they are all nondeterministic conditions, leave it un-changed.
+ if (nondeterministicConditions.length == andConditions.length) {
+ filter
+ } else {
+ val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
+ // Push down the small conditions without nondeterministic expressions.
+ val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
+ Filter(nondeterministicConditions.reduce(And),
+ project.copy(child = Filter(pushedCondition, grandChild)))
+ }
+ }
+ }
+
+ private def hasNondeterministic(
+ condition: Expression,
+ sourceAliases: AttributeMap[Expression]) = {
+ condition.collect {
+ case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
+ }.exists(!_.deterministic)
}
- private def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]) = {
- condition transform {
- case a: AttributeReference => sourceAliases.getOrElse(a, a)
+ // Substitute any attributes that are produced by the child projection, so that we safely
+ // eliminate it.
+ private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {
+ condition.transform {
+ case a: Attribute => sourceAliases.getOrElse(a, a)
}
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index dc28b3ffb5..0f1fde2fb0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
@@ -146,6 +146,49 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("nondeterministic: can't push down filter through project") {
+ val originalQuery = testRelation
+ .select(Rand(10).as('rand), 'a)
+ .where('rand > 5 || 'a > 5)
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ comparePlans(optimized, originalQuery)
+ }
+
+ test("nondeterministic: push down part of filter through project") {
+ val originalQuery = testRelation
+ .select(Rand(10).as('rand), 'a)
+ .where('rand > 5 && 'a > 5)
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+
+ val correctAnswer = testRelation
+ .where('a > 5)
+ .select(Rand(10).as('rand), 'a)
+ .where('rand > 5)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("nondeterministic: push down filter through project") {
+ val originalQuery = testRelation
+ .select(Rand(10).as('rand), 'a)
+ .where('a > 5 && 'a < 10)
+ .analyze
+
+ val optimized = Optimize.execute(originalQuery)
+ val correctAnswer = testRelation
+ .where('a > 5 && 'a < 10)
+ .select(Rand(10).as('rand), 'a)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
test("filters: combines filters") {
val originalQuery = testRelation
.select('a)