diff options
author | Wenchen Fan <cloud0fan@outlook.com> | 2015-07-23 09:37:53 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-07-23 09:38:02 -0700 |
commit | 52ef76de219c4bf19c54c99414b89a67d0bf457b (patch) | |
tree | 806e90ad8178f3b8144129f6220ff34430a99522 | |
parent | 26ed22aec8af42c6dc161e0a2827a4235a49a9a4 (diff) | |
download | spark-52ef76de219c4bf19c54c99414b89a67d0bf457b.tar.gz spark-52ef76de219c4bf19c54c99414b89a67d0bf457b.tar.bz2 spark-52ef76de219c4bf19c54c99414b89a67d0bf457b.zip |
[SPARK-9082] [SQL] [FOLLOW-UP] use `partition` in `PushPredicateThroughProject`
a follow up of https://github.com/apache/spark/pull/7446
Author: Wenchen Fan <cloud0fan@outlook.com>
Closes #7607 from cloud-fan/tmp and squashes the following commits:
7106989 [Wenchen Fan] use `partition` in `PushPredicateThroughProject`
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 22 |
1 files changed, 8 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d2db3dd3d0..b59f800e7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe // Split the condition into small conditions by `And`, so that we can push down part of this // condition without nondeterministic expressions. val andConditions = splitConjunctivePredicates(condition) - val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap)) + + val (deterministic, nondeterministic) = andConditions.partition(_.collect { + case a: Attribute if aliasMap.contains(a) => aliasMap(a) + }.forall(_.deterministic)) // If there is no nondeterministic conditions, push down the whole condition. - if (nondeterministicConditions.isEmpty) { + if (nondeterministic.isEmpty) { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) } else { // If they are all nondeterministic conditions, leave it un-changed. - if (nondeterministicConditions.length == andConditions.length) { + if (deterministic.isEmpty) { filter } else { - val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap)) // Push down the small conditions without nondeterministic expressions. - val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And) - Filter(nondeterministicConditions.reduce(And), + val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And) + Filter(nondeterministic.reduce(And), project.copy(child = Filter(pushedCondition, grandChild))) } } } - private def hasNondeterministic( - condition: Expression, - sourceAliases: AttributeMap[Expression]) = { - condition.collect { - case a: Attribute if sourceAliases.contains(a) => sourceAliases(a) - }.exists(!_.deterministic) - } - // Substitute any attributes that are produced by the child projection, so that we safely // eliminate it. private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = { |