aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-07 12:04:59 -0800
committerYin Huai <yhuai@databricks.com>2016-03-07 12:04:59 -0800
commitef77003178eb5cdcb4fe519fc540917656c5d577 (patch)
treee98a1feca6b9a8b80a767d938a4bf31e9c61d9af /sql
parent489641117651d11806d2773b7ded7c163d0260e5 (diff)
downloadspark-ef77003178eb5cdcb4fe519fc540917656c5d577.tar.gz
spark-ef77003178eb5cdcb4fe519fc540917656c5d577.tar.bz2
spark-ef77003178eb5cdcb4fe519fc540917656c5d577.zip
[SPARK-13495][SQL] Add Null Filters in the query plan for Filters/Joins based on their data constraints
## What changes were proposed in this pull request? This PR adds an optimizer rule to eliminate reading (unnecessary) NULL values if they are not required for correctness by inserting `isNotNull` filters is the query plan. These filters are currently inserted beneath existing `Filter` and `Join` operators and are inferred based on their data constraints. Note: While this optimization is applicable to all types of join, it primarily benefits `Inner` and `LeftSemi` joins. ## How was this patch tested? 1. Added a new `NullFilteringSuite` that tests for `IsNotNull` filters in the query plan for joins and filters. Also, tests interaction with the `CombineFilters` optimizer rules. 2. Test generated ExpressionTrees via `OrcFilterSuite` 3. Test filter source pushdown logic via `SimpleTextHadoopFsRelationSuite` cc yhuai nongli Author: Sameer Agarwal <sameer@databricks.com> Closes #11372 from sameeragarwal/gen-isnotnull.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala49
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala95
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala12
8 files changed, 182 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c83ec0fcb5..69ceea6329 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -77,6 +77,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
+ NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
@@ -594,6 +595,54 @@ object NullPropagation extends Rule[LogicalPlan] {
}
/**
+ * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
+ * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
+ * existing Filters and Join operators and are inferred based on their data constraints.
+ *
+ * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
+ * LeftSemi joins.
+ */
+object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter @ Filter(condition, child) =>
+ // We generate a list of additional isNotNull filters from the operator's existing constraints
+ // but remove those that are either already part of the filter condition or are part of the
+ // operator's child constraints.
+ val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
+ (child.constraints ++ splitConjunctivePredicates(condition))
+ if (newIsNotNullConstraints.nonEmpty) {
+ Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
+ } else {
+ filter
+ }
+
+ case join @ Join(left, right, joinType, condition) =>
+ val leftIsNotNullConstraints = join.constraints
+ .filter(_.isInstanceOf[IsNotNull])
+ .filter(_.references.subsetOf(left.outputSet)) -- left.constraints
+ val rightIsNotNullConstraints =
+ join.constraints
+ .filter(_.isInstanceOf[IsNotNull])
+ .filter(_.references.subsetOf(right.outputSet)) -- right.constraints
+ val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
+ Filter(leftIsNotNullConstraints.reduce(And), left)
+ } else {
+ left
+ }
+ val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
+ Filter(rightIsNotNullConstraints.reduce(And), right)
+ } else {
+ right
+ }
+ if (newLeftChild != left || newRightChild != right) {
+ Join(newLeftChild, newRightChild, joinType, condition)
+ } else {
+ join
+ }
+ }
+}
+
+/**
* Replaces [[Expression Expressions]] that can be statically evaluated with
* equivalent [[Literal]] values.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
new file mode 100644
index 0000000000..7e52d5ef67
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+class NullFilteringSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("NullFiltering", Once, NullFiltering) ::
+ Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("filter: filter out nulls in condition") {
+ val originalQuery = testRelation.where('a === 1).analyze
+ val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join: filter out nulls on either side on equi-join keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y,
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+ val left = x.where(IsNotNull('a) && IsNotNull('b))
+ val right = y.where(IsNotNull('a) && IsNotNull('c))
+ val correctAnswer = left.join(right,
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join with pre-existing filters: filter out nulls on either side") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.where('b > 5).join(y.where('c === 10),
+ condition = Some("x.a".attr === "y.a".attr)).analyze
+ val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
+ val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
+ val correctAnswer = left.join(right,
+ condition = Some("x.a".attr === "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single outer join: no null filters are generated") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y, FullOuter,
+ condition = Some("x.a".attr === "y.a".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, originalQuery)
+ }
+
+ test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
+ val t1 = testRelation.subquery('t1)
+ val t2 = testRelation.subquery('t2)
+ val t3 = testRelation.subquery('t3)
+ val t4 = testRelation.subquery('t4)
+
+ val originalQuery = t1
+ .join(t2, condition = Some("t1.b".attr === "t2.b".attr))
+ .join(t3, condition = Some("t2.b".attr === "t3.b".attr))
+ .join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
+ val correctAnswer = t1.where(IsNotNull('b))
+ .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
+ .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
+ .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index f9874088b5..0541844e0b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.util._
/**
* Provides helper methods for comparing plans.
*/
-abstract class PlanTest extends SparkFunSuite {
+abstract class PlanTest extends SparkFunSuite with PredicateHelper {
/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
@@ -39,10 +39,22 @@ abstract class PlanTest extends SparkFunSuite {
}
}
+ /**
+ * Normalizes the filter conditions that appear in the plan. For instance,
+ * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2)
+ * etc., will all now be equivalent.
+ */
+ private def normalizeFilters(plan: LogicalPlan) = {
+ plan transform {
+ case filter @ Filter(condition: Expression, child: LogicalPlan) =>
+ Filter(splitConjunctivePredicates(condition).sortBy(_.hashCode()).reduce(And), child)
+ }
+ }
+
/** Fails the test if the two plans do not match */
protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
- val normalized1 = normalizeExprIds(plan1)
- val normalized2 = normalizeExprIds(plan2)
+ val normalized1 = normalizeFilters(normalizeExprIds(plan1))
+ val normalized2 = normalizeFilters(normalizeExprIds(plan2))
if (normalized1 != normalized2) {
fail(
s"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index f66e08e6ca..a733237a5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -159,7 +159,7 @@ class PlannerSuite extends SharedSQLContext {
withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
- assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
+ assert(exp.toString.contains("PushedFilters: [IsNotNull(key), EqualTo(key,15)]"))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index bd51154c58..d2947676a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -74,10 +74,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
- maybeFilter.foreach { f =>
- // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
- assert(f.getClass === filterClass)
- }
+ // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+ maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index c94e73c4aa..6ca334dc6d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest {
(predicate: Predicate, filterOperator: PredicateLeaf.Operator)
(implicit df: DataFrame): Unit = {
def checkComparisonOperator(filter: SearchArgument) = {
- val operator = filter.getLeaves.asScala.head.getOperator
- assert(operator === filterOperator)
+ val operator = filter.getLeaves.asScala
+ assert(operator.map(_.getOperator).contains(filterOperator))
}
checkFilterPredicate(df, predicate, checkComparisonOperator)
}
@@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
!('_1 < 4),
- """leaf-0 = (LESS_THAN _1 4)
- |expr = (not leaf-0)""".stripMargin.trim
+ """leaf-0 = (IS_NULL _1)
+ |leaf-1 = (LESS_THAN _1 4)
+ |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
)
checkFilterPredicate(
'_1 < 2 || '_1 > 3,
@@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
'_1 < 2 && '_1 > 3,
- """leaf-0 = (LESS_THAN _1 2)
- |leaf-1 = (LESS_THAN_EQUALS _1 3)
- |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
+ """leaf-0 = (IS_NULL _1)
+ |leaf-1 = (LESS_THAN _1 2)
+ |leaf-2 = (LESS_THAN_EQUALS _1 3)
+ |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 9ab3e11609..e64bb77a03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
}
markup("Checking pushed filters")
- assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
+ assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters))
val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet
val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet
markup("Checking unhandled and inconvertible filters")
- assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters)
+ assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters))
markup("Checking partitioning filters")
val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 9cdf1fc585..bb552d6aa3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -141,12 +141,17 @@ class SimpleTextRelation(
// Constructs a filter predicate to simulate filter push-down
val predicate = {
val filterCondition: Expression = filters.collect {
- // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
+ // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and
+ // `isNotNull` filters
case sources.GreaterThan(column, value) =>
val dataType = dataSchema(column).dataType
val literal = Literal.create(value, dataType)
val attribute = inputAttributes.find(_.name == column).get
expressions.GreaterThan(attribute, literal)
+ case sources.IsNotNull(column) =>
+ val dataType = dataSchema(column).dataType
+ val attribute = inputAttributes.find(_.name == column).get
+ expressions.IsNotNull(attribute)
}.reduceOption(expressions.And).getOrElse(Literal(true))
InterpretedPredicate.create(filterCondition, inputAttributes)
}
@@ -184,11 +189,12 @@ class SimpleTextRelation(
}
}
- // `SimpleTextRelation` only handles `GreaterThan` filter. This is used to test filter push-down
- // and `BaseRelation.unhandledFilters()`.
+ // `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters. This is used to test
+ // filter push-down and `BaseRelation.unhandledFilters()`.
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter {
case _: GreaterThan => false
+ case _: IsNotNull => false
case _ => true
}
}