aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-16 16:26:51 -0700
committerYin Huai <yhuai@databricks.com>2016-03-16 16:26:51 -0700
commitf96997ba244a14c26e85a2475415a762d0c0d0a8 (patch)
tree8278b83a155fc09d736375890a043d9a0f8243d7 /sql/catalyst
parentb90c0206faeb0883fba1c79fe18aa72affb9988e (diff)
downloadspark-f96997ba244a14c26e85a2475415a762d0c0d0a8.tar.gz
spark-f96997ba244a14c26e85a2475415a762d0c0d0a8.tar.bz2
spark-f96997ba244a14c26e85a2475415a762d0c0d0a8.zip
[SPARK-13871][SQL] Support for inferring filters from data constraints
## What changes were proposed in this pull request? This PR generalizes the `NullFiltering` optimizer rule in catalyst to `InferFiltersFromConstraints` that can automatically infer all relevant filters based on an operator's constraints while making sure of 2 things: (a) no redundant filters are generated, and (b) filters that do not contribute to any further optimizations are not generated. ## How was this patch tested? Extended all tests in `InferFiltersFromConstraintsSuite` (that were initially based on `NullFilteringSuite` to test filter inference in `Filter` and `Join` operators. In particular the 2 tests ( `single inner join with pre-existing filters: filter out values on either side` and `multiple inner joins: filter out values on all sides on equi-join keys` attempts to highlight/test the real potential of this rule for join optimization. Author: Sameer Agarwal <sameer@databricks.com> Closes #11665 from sameeragarwal/infer-filters.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala56
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala)69
2 files changed, 63 insertions, 62 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2de92d06ec..76f50a3dc3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -72,6 +72,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
LimitPushDown,
ColumnPruning,
EliminateOperators,
+ InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
@@ -79,7 +80,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
- NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
@@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] {
}
/**
- * Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
- * by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
- * existing Filters and Join operators and are inferred based on their data constraints.
+ * Generate a list of additional filters from an operator's existing constraint but remove those
+ * that are either already part of the operator's condition or are part of the operator's child
+ * constraints. These filters are currently inserted to the existing conditions in the Filter
+ * operators and on either side of Join operators.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
-object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
+object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
- // We generate a list of additional isNotNull filters from the operator's existing constraints
- // but remove those that are either already part of the filter condition or are part of the
- // operator's child constraints.
- val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
+ val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
- if (newIsNotNullConstraints.nonEmpty) {
- Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
+ if (newFilters.nonEmpty) {
+ Filter(And(newFilters.reduce(And), condition), child)
} else {
filter
}
- case join @ Join(left, right, joinType, condition) =>
- val leftIsNotNullConstraints = join.constraints
- .filter(_.isInstanceOf[IsNotNull])
- .filter(_.references.subsetOf(left.outputSet)) -- left.constraints
- val rightIsNotNullConstraints =
- join.constraints
- .filter(_.isInstanceOf[IsNotNull])
- .filter(_.references.subsetOf(right.outputSet)) -- right.constraints
- val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
- Filter(leftIsNotNullConstraints.reduce(And), left)
- } else {
- left
- }
- val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
- Filter(rightIsNotNullConstraints.reduce(And), right)
- } else {
- right
- }
- if (newLeftChild != left || newRightChild != right) {
- Join(newLeftChild, newRightChild, joinType, condition)
- } else {
- join
+ case join @ Join(left, right, joinType, conditionOpt) =>
+ // Only consider constraints that can be pushed down completely to either the left or the
+ // right child
+ val constraints = join.constraints.filter { c =>
+ c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)}
+ // Remove those constraints that are already enforced by either the left or the right child
+ val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
+ val newConditionOpt = conditionOpt match {
+ case Some(condition) =>
+ val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
+ if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
+ case None =>
+ additionalConstraints.reduceOption(And)
}
+ if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index 142e4ae6e4..e7fdd5a620 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-class NullFilteringSuite extends PlanTest {
+class InferFiltersFromConstraintsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
- val batches = Batch("NullFiltering", Once, NullFiltering) ::
+ val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
+ Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
- test("filter: filter out nulls in condition") {
- val originalQuery = testRelation.where('a === 1).analyze
- val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
+ test("filter: filter out constraints in condition") {
+ val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
+ val correctAnswer = testRelation
+ .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
- test("single inner join: filter out nulls on either side on equi-join keys") {
+ test("single inner join: filter out values on either side on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
- condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b))
- val right = y.where(IsNotNull('a) && IsNotNull('c))
- val correctAnswer = left.join(right,
- condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
.analyze
+ val left = x.where(IsNotNull('a) && "x.a".attr === 1)
+ val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
+ val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
@@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest {
val originalQuery = x.join(y,
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b))
- val right = y.where(IsNotNull('a) && IsNotNull('c))
- val correctAnswer = left.join(right,
- condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
- .analyze
+ val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
+ val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
+ val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
- test("single inner join with pre-existing filters: filter out nulls on either side") {
+ test("single inner join with pre-existing filters: filter out values on either side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
- val originalQuery = x.where('b > 5).join(y.where('c === 10),
- condition = Some("x.a".attr === "y.a".attr)).analyze
- val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
- val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
+ val originalQuery = x.where('b > 5).join(y.where('a === 10),
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
+ val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
+ val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
val correctAnswer = left.join(right,
- condition = Some("x.a".attr === "y.a".attr)).analyze
+ condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
@@ -92,20 +90,33 @@ class NullFilteringSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}
- test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
+ test("multiple inner joins: filter out values on all sides on equi-join keys") {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)
val t3 = testRelation.subquery('t3)
val t4 = testRelation.subquery('t4)
- val originalQuery = t1
+ val originalQuery = t1.where('b > 5)
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
- val correctAnswer = t1.where(IsNotNull('b))
- .join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
- .join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
- .join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
+ val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
+ .join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
+ .join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
+ .join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
+ .analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("inner join with filter: filter out values on all sides on equi-join keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+
+ val originalQuery =
+ x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze
+ val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5)
+ .join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}