aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-08-19 21:11:35 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-19 21:11:35 +0800
commit67e59d464f782ff5f509234212aa072a7653d7bf (patch)
tree7b7c935a0ead8d5043443a4712584f173ef83342 /sql/catalyst/src/main
parent072acf5e1460d66d4b60b536d5b2ccddeee80794 (diff)
downloadspark-67e59d464f782ff5f509234212aa072a7653d7bf.tar.gz
spark-67e59d464f782ff5f509234212aa072a7653d7bf.tar.bz2
spark-67e59d464f782ff5f509234212aa072a7653d7bf.zip
[SPARK-16994][SQL] Whitelist operators for predicate pushdown
## What changes were proposed in this pull request? This patch changes predicate pushdown optimization rule (PushDownPredicate) from using a blacklist to a whitelist. That is to say, operators must be explicitly allowed. This approach is more future-proof: previously it was possible for us to introduce a new operator and then render the optimization rule incorrect. This also fixes the bug that previously we allowed pushing filter beneath limit, which was incorrect. That is to say, before this patch, the optimizer would rewrite ``` select * from (select * from range(10) limit 5) where id > 3 to select * from range(10) where id > 3 limit 5 ``` ## How was this patch tested? - a unit test case in FilterPushdownSuite - an end-to-end test in limit.sql Author: Reynold Xin <rxin@databricks.com> Closes #14713 from rxin/SPARK-16994.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala23
1 files changed, 17 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f7aa6da0a5..ce57f05868 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1208,17 +1208,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}
- // two filters should be combine together by other rules
- case filter @ Filter(_, _: Filter) => filter
- // should not push predicates through sample, or will generate different results.
- case filter @ Filter(_, _: Sample) => filter
-
- case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) =>
+ case filter @ Filter(condition, u: UnaryNode)
+ if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
u.withNewChildren(Seq(Filter(predicate, u.child)))
}
}
+ private def canPushThrough(p: UnaryNode): Boolean = p match {
+ // Note that some operators (e.g. project, aggregate, union) are being handled separately
+ // (earlier in this rule).
+ case _: AppendColumns => true
+ case _: BroadcastHint => true
+ case _: Distinct => true
+ case _: Generate => true
+ case _: Pivot => true
+ case _: RedistributeData => true
+ case _: Repartition => true
+ case _: ScriptTransformation => true
+ case _: Sort => true
+ case _ => false
+ }
+
private def pushDownPredicate(
filter: Filter,
grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {