aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2016-03-09 10:05:57 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-09 10:05:57 -0800
commit7791d0c3a9bdfe73e071266846f9ab1491fce50c (patch)
tree61ea12e38162c90f452487cf765e964f020550e1 /sql
parent9634e17d0183d43606a96fbba630e4c6ad720f7c (diff)
downloadspark-7791d0c3a9bdfe73e071266846f9ab1491fce50c.tar.gz
spark-7791d0c3a9bdfe73e071266846f9ab1491fce50c.tar.bz2
spark-7791d0c3a9bdfe73e071266846f9ab1491fce50c.zip
Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks"
This reverts commit e430614eae53c8864b31a1dc64db83e27100d1d9.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala103
3 files changed, 14 insertions, 150 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 1e4523e2d8..56a3dd02f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -18,8 +18,6 @@
package org.apache.spark.sql.catalyst.planning
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -28,28 +26,8 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
-abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
- extends PredicateHelper with Logging {
-
+abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
-
- // Attempts to re-order the individual conjunctive predicates in an expression to short circuit
- // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others.
- protected def reorderPredicates(expr: Expression): Expression = {
- splitConjunctivePredicates(expr)
- .sortWith((x, _) => x.isInstanceOf[IsNotNull])
- .reduce(And)
- }
-
- // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins
- protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = {
- exprOpt match {
- case Some(expr) =>
- Option(reorderPredicates(expr))
- case None =>
- exprOpt
- }
- }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 36fea4d203..debd04aa95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,13 +66,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastLeftSemiJoinHash(
- leftKeys, rightKeys, planLater(left), planLater(right),
- reorderPredicates(condition)) :: Nil
+ leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
joins.LeftSemiJoinHash(
- leftKeys, rightKeys, planLater(left), planLater(right),
- reorderPredicates(condition)) :: Nil
+ leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
case _ => Nil
}
}
@@ -113,39 +111,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
- planLater(left), planLater(right)))
+ leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left),
- planLater(right)))
+ leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoin(
- leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
- planLater(right)) :: Nil
+ leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil
// --- Outer joins --------------------------------------------------------------------------
case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition),
- planLater(left), planLater(right)))
+ leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition),
- planLater(left), planLater(right)))
+ leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
- leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left),
- planLater(right)) :: Nil
+ leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
@@ -260,12 +252,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
execution.joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), joins.BuildLeft, j.joinType,
- reorderPredicates(condition)) :: Nil
+ planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
execution.joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), joins.BuildRight, j.joinType,
- reorderPredicates(condition)) :: Nil
+ planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
case _ => Nil
}
}
@@ -275,7 +265,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, Inner, None) =>
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
- execution.Filter(reorderPredicates(condition),
+ execution.Filter(condition,
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
@@ -292,8 +282,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
// This join could be very slow or even hang forever
joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), buildSide, joinType,
- reorderPredicates(condition)) :: Nil
+ planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case _ => Nil
}
}
@@ -352,7 +341,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
- execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
+ execution.Filter(condition, planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
deleted file mode 100644
index dd0e43866b..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.execution
-import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
-import org.apache.spark.sql.test.SharedSQLContext
-
-
-class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {
-
- setupTestData()
-
- // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
- // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
- private def verifyStableOrder(before: Expression, after: Expression): Unit = {
- val oldPredicates = splitConjunctivePredicates(before)
- splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
- // Verify IsNotNull operator ordering
- assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])
-
- // Verify stable sort order
- if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
- (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
- assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
- }
- }
- }
-
- test("null ordering in filter predicates") {
- val query = sql(
- """
- |SELECT * from testData
- |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
- """.stripMargin)
- .queryExecution
-
- val logicalPlan = query.optimizedPlan
- val physicalPlan = query.sparkPlan
- assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
- assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)
-
- val logicalCondition = logicalPlan.collect {
- case logical.Filter(condition, _) =>
- condition
- }.head
-
- val physicalCondition = physicalPlan.collect {
- case Filter(condition, _) =>
- condition
- }.head
-
- verifyStableOrder(logicalCondition, physicalCondition)
- }
-
- test("null ordering in join predicates") {
- sqlContext.cacheManager.clearCache()
- val query = sql(
- """
- |SELECT * FROM testData t1
- |LEFT SEMI JOIN testData t2
- |ON t1.key = t2.key
- |AND t1.key + t2.key != 5
- |AND CONCAT(t1.value, t2.value) IS NOT NULL
- """.stripMargin)
- .queryExecution
-
- val logicalPlan = query.optimizedPlan
- val physicalPlan = query.sparkPlan
- assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
- assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
-
- val logicalCondition = logicalPlan.collect {
- case Join(_, _, _, condition) =>
- condition.get
- }.head
-
- val physicalCondition = physicalPlan.collect {
- case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
- conditionOpt.get
- }.head
-
- verifyStableOrder(logicalCondition, physicalCondition)
- }
-}