aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-08 15:40:45 -0800
committerYin Huai <yhuai@databricks.com>2016-03-08 15:40:45 -0800
commite430614eae53c8864b31a1dc64db83e27100d1d9 (patch)
tree8ca19928c742c82495aa473d7f2d28cfa9b372a6 /sql
parent1e28840594b9d972c96d3922ca0bf0f76e313e82 (diff)
downloadspark-e430614eae53c8864b31a1dc64db83e27100d1d9.tar.gz
spark-e430614eae53c8864b31a1dc64db83e27100d1d9.tar.bz2
spark-e430614eae53c8864b31a1dc64db83e27100d1d9.zip
[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit isNotNull checks
## What changes were proposed in this pull request? If a filter predicate or a join condition consists of `IsNotNull` checks, we should reorder these checks such that these non-nullability checks are evaluated before the rest of the predicates. For e.g., if a filter predicate is of the form `a > 5 && isNotNull(b)`, we should rewrite this as `isNotNull(b) && a > 5` during physical plan generation. ## How was this patch tested? new unit tests that verify the physical plan for both filters and joins in `ReorderedPredicateSuite` Author: Sameer Agarwal <sameer@databricks.com> Closes #11511 from sameeragarwal/reorder-isnotnull.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala103
3 files changed, 150 insertions, 14 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 56a3dd02f9..1e4523e2d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.planning
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -26,8 +28,28 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
* be used for execution. If this strategy does not apply to the give logical operation then an
* empty list should be returned.
*/
-abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
+abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
+ extends PredicateHelper with Logging {
+
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
+
+ // Attempts to re-order the individual conjunctive predicates in an expression to short circuit
+ // the evaluation of relatively cheaper checks (e.g., checking for nullability) before others.
+ protected def reorderPredicates(expr: Expression): Expression = {
+ splitConjunctivePredicates(expr)
+ .sortWith((x, _) => x.isInstanceOf[IsNotNull])
+ .reduce(And)
+ }
+
+ // Wrapper around reorderPredicates(expr: Expression) to reorder optional conditions in joins
+ protected def reorderPredicates(exprOpt: Option[Expression]): Option[Expression] = {
+ exprOpt match {
+ case Some(expr) =>
+ Option(reorderPredicates(expr))
+ case None =>
+ exprOpt
+ }
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index debd04aa95..36fea4d203 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,11 +66,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastLeftSemiJoinHash(
- leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
+ leftKeys, rightKeys, planLater(left), planLater(right),
+ reorderPredicates(condition)) :: Nil
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
joins.LeftSemiJoinHash(
- leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
+ leftKeys, rightKeys, planLater(left), planLater(right),
+ reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
@@ -111,33 +113,39 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
+ planLater(left), planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), planLater(left),
+ planLater(right)))
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoin(
- leftKeys, rightKeys, condition, planLater(left), planLater(right)) :: Nil
+ leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
+ planLater(right)) :: Nil
// --- Outer joins --------------------------------------------------------------------------
case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, LeftOuter, BuildRight, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, LeftOuter, BuildRight, reorderPredicates(condition),
+ planLater(left), planLater(right)))
case ExtractEquiJoinKeys(
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, RightOuter, BuildLeft, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, RightOuter, BuildLeft, reorderPredicates(condition),
+ planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
- leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
+ leftKeys, rightKeys, joinType, reorderPredicates(condition), planLater(left),
+ planLater(right)) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
@@ -252,10 +260,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, condition) =>
execution.joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), joins.BuildLeft, j.joinType, condition) :: Nil
+ planLater(left), planLater(right), joins.BuildLeft, j.joinType,
+ reorderPredicates(condition)) :: Nil
case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | LeftSemi, condition) =>
execution.joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), joins.BuildRight, j.joinType, condition) :: Nil
+ planLater(left), planLater(right), joins.BuildRight, j.joinType,
+ reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
@@ -265,7 +275,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Join(left, right, Inner, None) =>
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
- execution.Filter(condition,
+ execution.Filter(reorderPredicates(condition),
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
case _ => Nil
}
@@ -282,7 +292,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
// This join could be very slow or even hang forever
joins.BroadcastNestedLoopJoin(
- planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
+ planLater(left), planLater(right), buildSide, joinType,
+ reorderPredicates(condition)) :: Nil
case _ => Nil
}
}
@@ -341,7 +352,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
- execution.Filter(condition, planLater(child)) :: Nil
+ execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.Expand(e.projections, e.output, planLater(child)) :: Nil
case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
new file mode 100644
index 0000000000..dd0e43866b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.Join
+import org.apache.spark.sql.execution
+import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {
+
+ setupTestData()
+
+ // Verifies that (a) In the new condition, the IsNotNull operators precede rest of the operators
+ // and (b) The relative sort order of IsNotNull and !IsNotNull operators is still maintained
+ private def verifyStableOrder(before: Expression, after: Expression): Unit = {
+ val oldPredicates = splitConjunctivePredicates(before)
+ splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
+ // Verify IsNotNull operator ordering
+ assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])
+
+ // Verify stable sort order
+ if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
+ (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
+ assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
+ }
+ }
+ }
+
+ test("null ordering in filter predicates") {
+ val query = sql(
+ """
+ |SELECT * from testData
+ |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
+ """.stripMargin)
+ .queryExecution
+
+ val logicalPlan = query.optimizedPlan
+ val physicalPlan = query.sparkPlan
+ assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
+ assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)
+
+ val logicalCondition = logicalPlan.collect {
+ case logical.Filter(condition, _) =>
+ condition
+ }.head
+
+ val physicalCondition = physicalPlan.collect {
+ case Filter(condition, _) =>
+ condition
+ }.head
+
+ verifyStableOrder(logicalCondition, physicalCondition)
+ }
+
+ test("null ordering in join predicates") {
+ sqlContext.cacheManager.clearCache()
+ val query = sql(
+ """
+ |SELECT * FROM testData t1
+ |LEFT SEMI JOIN testData t2
+ |ON t1.key = t2.key
+ |AND t1.key + t2.key != 5
+ |AND CONCAT(t1.value, t2.value) IS NOT NULL
+ """.stripMargin)
+ .queryExecution
+
+ val logicalPlan = query.optimizedPlan
+ val physicalPlan = query.sparkPlan
+ assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
+ assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
+
+ val logicalCondition = logicalPlan.collect {
+ case Join(_, _, _, condition) =>
+ condition.get
+ }.head
+
+ val physicalCondition = physicalPlan.collect {
+ case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
+ conditionOpt.get
+ }.head
+
+ verifyStableOrder(logicalCondition, physicalCondition)
+ }
+}