From 540e91280147a61727f99592a66c0cbb12328fac Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 26 Aug 2016 16:40:59 -0700 Subject: [SPARK-17244] Catalyst should not pushdown non-deterministic join conditions ## What changes were proposed in this pull request? Given that non-deterministic expressions can be stateful, pushing them down the query plan during the optimization phase can cause incorrect behavior. This patch fixes that issue by explicitly disabling that. ## How was this patch tested? A new test in `FilterPushdownSuite` that checks catalyst behavior for both deterministic and non-deterministic join conditions. Author: Sameer Agarwal Closes #14815 from sameeragarwal/constraint-inputfile. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 21 ++++++++++++++------- .../catalyst/optimizer/FilterPushdownSuite.scala | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 82ad0fb5ee..5c8316189d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1379,18 +1379,25 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { */ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { /** - * Splits join condition expressions into three categories based on the attributes required - * to evaluate them. + * Splits join condition expressions or filter predicates (on a given join's output) into three + * categories based on the attributes required to evaluate them. Note that we explicitly exclude + * on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or + * canEvaluateInRight to prevent pushing these predicates on either side of the join. * * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) */ private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { + // Note: In order to ensure correctness, it's important to not change the relative ordering of + // any deterministic expression that follows a non-deterministic expression. To achieve this, + // we only consider pushing down those expressions that precede the first non-deterministic + // expression in the condition. + val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic) val (leftEvaluateCondition, rest) = - condition.partition(_.references subsetOf left.outputSet) + pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) val (rightEvaluateCondition, commonCondition) = - rest.partition(_.references subsetOf right.outputSet) + rest.partition(expr => expr.references.subsetOf(right.outputSet)) - (leftEvaluateCondition, rightEvaluateCondition, commonCondition) + (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic) } def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -1441,7 +1448,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } // push down the join filter into sub query scanning if applicable - case f @ Join(left, right, joinType, joinCondition) => + case j @ Join(left, right, joinType, joinCondition) => val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) @@ -1471,7 +1478,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, LeftOuter, newJoinCond) - case FullOuter => f + case FullOuter => j case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 9f25e9d8e9..55836f96f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -987,4 +987,18 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } + + test("join condition pushdown: deterministic and non-deterministic") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + // Verify that all conditions preceding the first non-deterministic condition are pushed down + // by the optimizer and others are not. + val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 && + "x.a".attr === Rand(10) && "y.b".attr === 5)) + val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5), + condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5)) + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze) + } } -- cgit v1.2.3