From ef77003178eb5cdcb4fe519fc540917656c5d577 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Mon, 7 Mar 2016 12:04:59 -0800 Subject: [SPARK-13495][SQL] Add Null Filters in the query plan for Filters/Joins based on their data constraints ## What changes were proposed in this pull request? This PR adds an optimizer rule to eliminate reading (unnecessary) NULL values if they are not required for correctness by inserting `isNotNull` filters is the query plan. These filters are currently inserted beneath existing `Filter` and `Join` operators and are inferred based on their data constraints. Note: While this optimization is applicable to all types of join, it primarily benefits `Inner` and `LeftSemi` joins. ## How was this patch tested? 1. Added a new `NullFilteringSuite` that tests for `IsNotNull` filters in the query plan for joins and filters. Also, tests interaction with the `CombineFilters` optimizer rules. 2. Test generated ExpressionTrees via `OrcFilterSuite` 3. Test filter source pushdown logic via `SimpleTextHadoopFsRelationSuite` cc yhuai nongli Author: Sameer Agarwal Closes #11372 from sameeragarwal/gen-isnotnull. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 49 +++++++++++ .../catalyst/optimizer/NullFilteringSuite.scala | 95 ++++++++++++++++++++++ .../apache/spark/sql/catalyst/plans/PlanTest.scala | 18 +++- .../apache/spark/sql/execution/PlannerSuite.scala | 2 +- .../datasources/parquet/ParquetFilterSuite.scala | 6 +- .../apache/spark/sql/hive/orc/OrcFilterSuite.scala | 16 ++-- .../sources/SimpleTextHadoopFsRelationSuite.scala | 4 +- .../spark/sql/sources/SimpleTextRelation.scala | 12 ++- 8 files changed, 182 insertions(+), 20 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c83ec0fcb5..69ceea6329 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -77,6 +77,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction + NullFiltering, NullPropagation, OptimizeIn, ConstantFolding, @@ -593,6 +594,54 @@ object NullPropagation extends Rule[LogicalPlan] { } } +/** + * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness + * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath + * existing Filters and Join operators and are inferred based on their data constraints. + * + * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and + * LeftSemi joins. + */ +object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case filter @ Filter(condition, child) => + // We generate a list of additional isNotNull filters from the operator's existing constraints + // but remove those that are either already part of the filter condition or are part of the + // operator's child constraints. + val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- + (child.constraints ++ splitConjunctivePredicates(condition)) + if (newIsNotNullConstraints.nonEmpty) { + Filter(And(newIsNotNullConstraints.reduce(And), condition), child) + } else { + filter + } + + case join @ Join(left, right, joinType, condition) => + val leftIsNotNullConstraints = join.constraints + .filter(_.isInstanceOf[IsNotNull]) + .filter(_.references.subsetOf(left.outputSet)) -- left.constraints + val rightIsNotNullConstraints = + join.constraints + .filter(_.isInstanceOf[IsNotNull]) + .filter(_.references.subsetOf(right.outputSet)) -- right.constraints + val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { + Filter(leftIsNotNullConstraints.reduce(And), left) + } else { + left + } + val newRightChild = if (rightIsNotNullConstraints.nonEmpty) { + Filter(rightIsNotNullConstraints.reduce(And), right) + } else { + right + } + if (newLeftChild != left || newRightChild != right) { + Join(newLeftChild, newRightChild, joinType, condition) + } else { + join + } + } +} + /** * Replaces [[Expression Expressions]] that can be statically evaluated with * equivalent [[Literal]] values. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala new file mode 100644 index 0000000000..7e52d5ef67 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class NullFilteringSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("NullFiltering", Once, NullFiltering) :: + Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("filter: filter out nulls in condition") { + val originalQuery = testRelation.where('a === 1).analyze + val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single inner join: filter out nulls on either side on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze + val left = x.where(IsNotNull('a) && IsNotNull('b)) + val right = y.where(IsNotNull('a) && IsNotNull('c)) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single inner join with pre-existing filters: filter out nulls on either side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.where('b > 5).join(y.where('c === 10), + condition = Some("x.a".attr === "y.a".attr)).analyze + val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5) + val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("single outer join: no null filters are generated") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, FullOuter, + condition = Some("x.a".attr === "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, originalQuery) + } + + test("multiple inner joins: filter out nulls on all sides on equi-join keys") { + val t1 = testRelation.subquery('t1) + val t2 = testRelation.subquery('t2) + val t3 = testRelation.subquery('t3) + val t4 = testRelation.subquery('t4) + + val originalQuery = t1 + .join(t2, condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3, condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze + val correctAnswer = t1.where(IsNotNull('b)) + .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr)) + .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr)) + .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index f9874088b5..0541844e0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util._ /** * Provides helper methods for comparing plans. */ -abstract class PlanTest extends SparkFunSuite { +abstract class PlanTest extends SparkFunSuite with PredicateHelper { /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical. @@ -39,10 +39,22 @@ abstract class PlanTest extends SparkFunSuite { } } + /** + * Normalizes the filter conditions that appear in the plan. For instance, + * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2) + * etc., will all now be equivalent. + */ + private def normalizeFilters(plan: LogicalPlan) = { + plan transform { + case filter @ Filter(condition: Expression, child: LogicalPlan) => + Filter(splitConjunctivePredicates(condition).sortBy(_.hashCode()).reduce(And), child) + } + } + /** Fails the test if the two plans do not match */ protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { - val normalized1 = normalizeExprIds(plan1) - val normalized2 = normalizeExprIds(plan2) + val normalized1 = normalizeFilters(normalizeExprIds(plan1)) + val normalized2 = normalizeFilters(normalizeExprIds(plan2)) if (normalized1 != normalized2) { fail( s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f66e08e6ca..a733237a5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -159,7 +159,7 @@ class PlannerSuite extends SharedSQLContext { withTempTable("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan - assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]")) + assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index bd51154c58..d2947676a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -74,10 +74,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex selectedFilters.foreach { pred => val maybeFilter = ParquetFilters.createFilter(df.schema, pred) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.foreach { f => - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(f.getClass === filterClass) - } + // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) + maybeFilter.exists(_.getClass === filterClass) } checker(stripSparkFilter(query), expected) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index c94e73c4aa..6ca334dc6d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest { (predicate: Predicate, filterOperator: PredicateLeaf.Operator) (implicit df: DataFrame): Unit = { def checkComparisonOperator(filter: SearchArgument) = { - val operator = filter.getLeaves.asScala.head.getOperator - assert(operator === filterOperator) + val operator = filter.getLeaves.asScala + assert(operator.map(_.getOperator).contains(filterOperator)) } checkFilterPredicate(df, predicate, checkComparisonOperator) } @@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest { ) checkFilterPredicate( !('_1 < 4), - """leaf-0 = (LESS_THAN _1 4) - |expr = (not leaf-0)""".stripMargin.trim + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 4) + |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim ) checkFilterPredicate( '_1 < 2 || '_1 > 3, @@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest { ) checkFilterPredicate( '_1 < 2 && '_1 > 3, - """leaf-0 = (LESS_THAN _1 2) - |leaf-1 = (LESS_THAN_EQUALS _1 3) - |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 2) + |leaf-2 = (LESS_THAN_EQUALS _1 3) + |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim ) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 9ab3e11609..e64bb77a03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat } markup("Checking pushed filters") - assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet) + assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters)) val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet markup("Checking unhandled and inconvertible filters") - assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters) + assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters)) markup("Checking partitioning filters") val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9cdf1fc585..bb552d6aa3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -141,12 +141,17 @@ class SimpleTextRelation( // Constructs a filter predicate to simulate filter push-down val predicate = { val filterCondition: Expression = filters.collect { - // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter + // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and + // `isNotNull` filters case sources.GreaterThan(column, value) => val dataType = dataSchema(column).dataType val literal = Literal.create(value, dataType) val attribute = inputAttributes.find(_.name == column).get expressions.GreaterThan(attribute, literal) + case sources.IsNotNull(column) => + val dataType = dataSchema(column).dataType + val attribute = inputAttributes.find(_.name == column).get + expressions.IsNotNull(attribute) }.reduceOption(expressions.And).getOrElse(Literal(true)) InterpretedPredicate.create(filterCondition, inputAttributes) } @@ -184,11 +189,12 @@ class SimpleTextRelation( } } - // `SimpleTextRelation` only handles `GreaterThan` filter. This is used to test filter push-down - // and `BaseRelation.unhandledFilters()`. + // `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters. This is used to test + // filter push-down and `BaseRelation.unhandledFilters()`. override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { filters.filter { case _: GreaterThan => false + case _: IsNotNull => false case _ => true } } -- cgit v1.2.3