aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-07-23 09:37:53 -0700
committerYin Huai <yhuai@databricks.com>2015-07-23 09:38:02 -0700
commit52ef76de219c4bf19c54c99414b89a67d0bf457b (patch)
tree806e90ad8178f3b8144129f6220ff34430a99522
parent26ed22aec8af42c6dc161e0a2827a4235a49a9a4 (diff)
downloadspark-52ef76de219c4bf19c54c99414b89a67d0bf457b.tar.gz
spark-52ef76de219c4bf19c54c99414b89a67d0bf457b.tar.bz2
spark-52ef76de219c4bf19c54c99414b89a67d0bf457b.zip
[SPARK-9082] [SQL] [FOLLOW-UP] use `partition` in `PushPredicateThroughProject`
a follow up of https://github.com/apache/spark/pull/7446 Author: Wenchen Fan <cloud0fan@outlook.com> Closes #7607 from cloud-fan/tmp and squashes the following commits: 7106989 [Wenchen Fan] use `partition` in `PushPredicateThroughProject`
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala22
1 files changed, 8 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d2db3dd3d0..b59f800e7c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
// Split the condition into small conditions by `And`, so that we can push down part of this
// condition without nondeterministic expressions.
val andConditions = splitConjunctivePredicates(condition)
- val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))
+
+ val (deterministic, nondeterministic) = andConditions.partition(_.collect {
+ case a: Attribute if aliasMap.contains(a) => aliasMap(a)
+ }.forall(_.deterministic))
// If there is no nondeterministic conditions, push down the whole condition.
- if (nondeterministicConditions.isEmpty) {
+ if (nondeterministic.isEmpty) {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
} else {
// If they are all nondeterministic conditions, leave it un-changed.
- if (nondeterministicConditions.length == andConditions.length) {
+ if (deterministic.isEmpty) {
filter
} else {
- val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
// Push down the small conditions without nondeterministic expressions.
- val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
- Filter(nondeterministicConditions.reduce(And),
+ val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
+ Filter(nondeterministic.reduce(And),
project.copy(child = Filter(pushedCondition, grandChild)))
}
}
}
- private def hasNondeterministic(
- condition: Expression,
- sourceAliases: AttributeMap[Expression]) = {
- condition.collect {
- case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
- }.exists(!_.deterministic)
- }
-
// Substitute any attributes that are produced by the child projection, so that we safely
// eliminate it.
private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {