aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache
diff options
context:
space:
mode:
authorSameer Agarwal <sameerag@cs.berkeley.edu>2016-08-26 16:40:59 -0700
committerYin Huai <yhuai@databricks.com>2016-08-26 16:40:59 -0700
commit540e91280147a61727f99592a66c0cbb12328fac (patch)
tree47984ed7ef3e1a7023a45415bdf86a06c50be44d /sql/catalyst/src/main/scala/org/apache
parentf64a1ddd09a34d5d867ccbaba46204d75fad038d (diff)
downloadspark-540e91280147a61727f99592a66c0cbb12328fac.tar.gz
spark-540e91280147a61727f99592a66c0cbb12328fac.tar.bz2
spark-540e91280147a61727f99592a66c0cbb12328fac.zip
[SPARK-17244] Catalyst should not pushdown non-deterministic join conditions
## What changes were proposed in this pull request? Given that non-deterministic expressions can be stateful, pushing them down the query plan during the optimization phase can cause incorrect behavior. This patch fixes that issue by explicitly disabling that. ## How was this patch tested? A new test in `FilterPushdownSuite` that checks catalyst behavior for both deterministic and non-deterministic join conditions. Author: Sameer Agarwal <sameerag@cs.berkeley.edu> Closes #14815 from sameeragarwal/constraint-inputfile.
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala21
1 files changed, 14 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 82ad0fb5ee..5c8316189d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1379,18 +1379,25 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
- * Splits join condition expressions into three categories based on the attributes required
- * to evaluate them.
+ * Splits join condition expressions or filter predicates (on a given join's output) into three
+ * categories based on the attributes required to evaluate them. Note that we explicitly exclude
+ * on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
+ * canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
+ // Note: In order to ensure correctness, it's important to not change the relative ordering of
+ // any deterministic expression that follows a non-deterministic expression. To achieve this,
+ // we only consider pushing down those expressions that precede the first non-deterministic
+ // expression in the condition.
+ val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic)
val (leftEvaluateCondition, rest) =
- condition.partition(_.references subsetOf left.outputSet)
+ pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
- rest.partition(_.references subsetOf right.outputSet)
+ rest.partition(expr => expr.references.subsetOf(right.outputSet))
- (leftEvaluateCondition, rightEvaluateCondition, commonCondition)
+ (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -1441,7 +1448,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}
// push down the join filter into sub query scanning if applicable
- case f @ Join(left, right, joinType, joinCondition) =>
+ case j @ Join(left, right, joinType, joinCondition) =>
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
@@ -1471,7 +1478,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, LeftOuter, newJoinCond)
- case FullOuter => f
+ case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
}