aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-07 12:04:59 -0800
committerYin Huai <yhuai@databricks.com>2016-03-07 12:04:59 -0800
commitef77003178eb5cdcb4fe519fc540917656c5d577 (patch)
treee98a1feca6b9a8b80a767d938a4bf31e9c61d9af /sql/hive
parent489641117651d11806d2773b7ded7c163d0260e5 (diff)
downloadspark-ef77003178eb5cdcb4fe519fc540917656c5d577.tar.gz
spark-ef77003178eb5cdcb4fe519fc540917656c5d577.tar.bz2
spark-ef77003178eb5cdcb4fe519fc540917656c5d577.zip
[SPARK-13495][SQL] Add Null Filters in the query plan for Filters/Joins based on their data constraints
## What changes were proposed in this pull request? This PR adds an optimizer rule to eliminate reading (unnecessary) NULL values if they are not required for correctness by inserting `isNotNull` filters is the query plan. These filters are currently inserted beneath existing `Filter` and `Join` operators and are inferred based on their data constraints. Note: While this optimization is applicable to all types of join, it primarily benefits `Inner` and `LeftSemi` joins. ## How was this patch tested? 1. Added a new `NullFilteringSuite` that tests for `IsNotNull` filters in the query plan for joins and filters. Also, tests interaction with the `CombineFilters` optimizer rules. 2. Test generated ExpressionTrees via `OrcFilterSuite` 3. Test filter source pushdown logic via `SimpleTextHadoopFsRelationSuite` cc yhuai nongli Author: Sameer Agarwal <sameer@databricks.com> Closes #11372 from sameeragarwal/gen-isnotnull.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala12
3 files changed, 20 insertions, 12 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index c94e73c4aa..6ca334dc6d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -61,8 +61,8 @@ class OrcFilterSuite extends QueryTest with OrcTest {
(predicate: Predicate, filterOperator: PredicateLeaf.Operator)
(implicit df: DataFrame): Unit = {
def checkComparisonOperator(filter: SearchArgument) = {
- val operator = filter.getLeaves.asScala.head.getOperator
- assert(operator === filterOperator)
+ val operator = filter.getLeaves.asScala
+ assert(operator.map(_.getOperator).contains(filterOperator))
}
checkFilterPredicate(df, predicate, checkComparisonOperator)
}
@@ -216,8 +216,9 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
!('_1 < 4),
- """leaf-0 = (LESS_THAN _1 4)
- |expr = (not leaf-0)""".stripMargin.trim
+ """leaf-0 = (IS_NULL _1)
+ |leaf-1 = (LESS_THAN _1 4)
+ |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
)
checkFilterPredicate(
'_1 < 2 || '_1 > 3,
@@ -227,9 +228,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
)
checkFilterPredicate(
'_1 < 2 && '_1 > 3,
- """leaf-0 = (LESS_THAN _1 2)
- |leaf-1 = (LESS_THAN_EQUALS _1 3)
- |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
+ """leaf-0 = (IS_NULL _1)
+ |leaf-1 = (LESS_THAN _1 2)
+ |leaf-2 = (LESS_THAN_EQUALS _1 3)
+ |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index 9ab3e11609..e64bb77a03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -192,14 +192,14 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
}
markup("Checking pushed filters")
- assert(SimpleTextRelation.pushedFilters === pushedFilters.toSet)
+ assert(pushedFilters.toSet.subsetOf(SimpleTextRelation.pushedFilters))
val expectedInconvertibleFilters = inconvertibleFilters.map(_.expr).toSet
val expectedUnhandledFilters = unhandledFilters.map(_.expr).toSet
val expectedPartitioningFilters = partitioningFilters.map(_.expr).toSet
markup("Checking unhandled and inconvertible filters")
- assert(expectedInconvertibleFilters ++ expectedUnhandledFilters === nonPushedFilters)
+ assert((expectedInconvertibleFilters ++ expectedUnhandledFilters).subsetOf(nonPushedFilters))
markup("Checking partitioning filters")
val actualPartitioningFilters = splitConjunctivePredicates(filter.expr).filter {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 9cdf1fc585..bb552d6aa3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -141,12 +141,17 @@ class SimpleTextRelation(
// Constructs a filter predicate to simulate filter push-down
val predicate = {
val filterCondition: Expression = filters.collect {
- // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
+ // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` and
+ // `isNotNull` filters
case sources.GreaterThan(column, value) =>
val dataType = dataSchema(column).dataType
val literal = Literal.create(value, dataType)
val attribute = inputAttributes.find(_.name == column).get
expressions.GreaterThan(attribute, literal)
+ case sources.IsNotNull(column) =>
+ val dataType = dataSchema(column).dataType
+ val attribute = inputAttributes.find(_.name == column).get
+ expressions.IsNotNull(attribute)
}.reduceOption(expressions.And).getOrElse(Literal(true))
InterpretedPredicate.create(filterCondition, inputAttributes)
}
@@ -184,11 +189,12 @@ class SimpleTextRelation(
}
}
- // `SimpleTextRelation` only handles `GreaterThan` filter. This is used to test filter push-down
- // and `BaseRelation.unhandledFilters()`.
+ // `SimpleTextRelation` only handles `GreaterThan` and `IsNotNull` filters. This is used to test
+ // filter push-down and `BaseRelation.unhandledFilters()`.
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
filters.filter {
case _: GreaterThan => false
+ case _: IsNotNull => false
case _ => true
}
}