From 7791d0c3a9bdfe73e071266846f9ab1491fce50c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 9 Mar 2016 10:05:57 -0800 Subject: Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks" This reverts commit e430614eae53c8864b31a1dc64db83e27100d1d9. --- .../spark/sql/catalyst/planning/QueryPlanner.scala | 24 +---- .../spark/sql/execution/SparkStrategies.scala | 37 +++----- .../sql/execution/ReorderedPredicateSuite.scala | 103 --------------------- 3 files changed, 14 insertions(+), 150 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 1e4523e2d8..56a3dd02f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.catalyst.planning import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper} -import org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode @@ -28,28 +26,8 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * be used for execution. If this strategy does not apply to the give logical operation then an * empty list should be returned. */ -abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] - extends PredicateHelper with Logging { - +abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { def apply(plan: LogicalPlan): Seq[PhysicalPlan] - - // Attempts to re-order the individual conjunctive predicates in an expression to short circuit - // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others. - protected def reorderPredicates(expr: Expression): Expression = { - splitConjunctivePredicates(expr) - .sortWith((x, _) => x.isInstanceOf[IsNotNull]) - .reduce(And) - } - - // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins - protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = { - exprOpt match { - case Some(expr) => - Option(reorderPredicates(expr)) - case None => - exprOpt - } - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 36fea4d203..debd04aa95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,13 +66,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys( LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => joins.BroadcastLeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), - reorderPredicates(condition)) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => joins.LeftSemiJoinHash( - leftKeys, rightKeys, planLater(left), planLater(right), - reorderPredicates(condition)) :: Nil + leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil case _ => Nil } } @@ -113,39 +111,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition), - planLater(left), planLater(right))) + leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left), - planLater(right))) + leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoin( - leftKeys, rightKeys, reorderPredicates(condition), planLater(left), - planLater(right)) :: Nil + leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil // --- Outer joins -------------------------------------------------------------------------- case ExtractEquiJoinKeys( LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition), - planLater(left), planLater(right))) + leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys( RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) => Seq(joins.BroadcastHashJoin( - leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition), - planLater(left), planLater(right))) + leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeOuterJoin( - leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left), - planLater(right)) :: Nil + leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- @@ -260,12 +252,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildLeft, j.joinType, - reorderPredicates(condition)) :: Nil + planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) => execution.joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), joins.BuildRight, j.joinType, - reorderPredicates(condition)) :: Nil + planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil case _ => Nil } } @@ -275,7 +265,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Join(left, right, Inner, None) => execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil case logical.Join(left, right, Inner, Some(condition)) => - execution.Filter(reorderPredicates(condition), + execution.Filter(condition, execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil case _ => Nil } @@ -292,8 +282,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } // This join could be very slow or even hang forever joins.BroadcastNestedLoopJoin( - planLater(left), planLater(right), buildSide, joinType, - reorderPredicates(condition)) :: Nil + planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case _ => Nil } } @@ -352,7 +341,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => - execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil + execution.Filter(condition, planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala deleted file mode 100644 index dd0e43866b..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper} -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.execution -import org.apache.spark.sql.execution.joins.LeftSemiJoinHash -import org.apache.spark.sql.test.SharedSQLContext - - -class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper { - - setupTestData() - - // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators - // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained - private def verifyStableOrder(before: Expression, after: Expression): Unit = { - val oldPredicates = splitConjunctivePredicates(before) - splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) => - // Verify IsNotNull operator ordering - assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull]) - - // Verify stable sort order - if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) || - (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) { - assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y)) - } - } - } - - test("null ordering in filter predicates") { - val query = sql( - """ - |SELECT * from testData - |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5 - """.stripMargin) - .queryExecution - - val logicalPlan = query.optimizedPlan - val physicalPlan = query.sparkPlan - assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined) - assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined) - - val logicalCondition = logicalPlan.collect { - case logical.Filter(condition, _) => - condition - }.head - - val physicalCondition = physicalPlan.collect { - case Filter(condition, _) => - condition - }.head - - verifyStableOrder(logicalCondition, physicalCondition) - } - - test("null ordering in join predicates") { - sqlContext.cacheManager.clearCache() - val query = sql( - """ - |SELECT * FROM testData t1 - |LEFT SEMI JOIN testData t2 - |ON t1.key = t2.key - |AND t1.key + t2.key != 5 - |AND CONCAT(t1.value, t2.value) IS NOT NULL - """.stripMargin) - .queryExecution - - val logicalPlan = query.optimizedPlan - val physicalPlan = query.sparkPlan - assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined) - assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined) - - val logicalCondition = logicalPlan.collect { - case Join(_, _, _, condition) => - condition.get - }.head - - val physicalCondition = physicalPlan.collect { - case LeftSemiJoinHash(_, _, _, _, conditionOpt) => - conditionOpt.get - }.head - - verifyStableOrder(logicalCondition, physicalCondition) - } -} -- cgit v1.2.3