diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-11-30 19:40:58 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-11-30 19:40:58 +0800 |
commit | 2eb093decb5e87a1ea71bbaa28092876a8c84996 (patch) | |
tree | 00251a85adfdafa9c927d9c74f7cdf57f6e05179 /sql/catalyst/src | |
parent | c5a64d760600ff430899e401751c41dc6b27cee6 (diff) | |
download | spark-2eb093decb5e87a1ea71bbaa28092876a8c84996.tar.gz spark-2eb093decb5e87a1ea71bbaa28092876a8c84996.tar.bz2 spark-2eb093decb5e87a1ea71bbaa28092876a8c84996.zip |
[SPARK-17897][SQL] Fixed IsNotNull Constraint Inference Rule
### What changes were proposed in this pull request?
The `constraints` of an operator is the expressions that evaluate to `true` for all the rows produced. That means, the expression result should be neither `false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the constraints, which are generated by its own predicates or propagated from the children. The constraint can be a complex expression. For better usage of these constraints, we try to push down `IsNotNull` to the lowest-level expressions (i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is null intolerant. (When the input is NULL, the null-intolerant expression always evaluates to NULL.)
Below is the existing code we have for `IsNotNull` pushdown.
```Scala
private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match {
case a: Attribute => Seq(a)
case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
expr.children.flatMap(scanNullIntolerantExpr)
case _ => Seq.empty[Attribute]
}
```
**`IsNotNull` itself is not null-intolerant.** It converts `null` to `false`. If the expression does not include any `Not`-like expression, it works; otherwise, it could generate a wrong result. This PR is to fix the above function by removing the `IsNotNull` from the inference. After the fix, when a constraint has a `IsNotNull` expression, we infer new attribute-specific `IsNotNull` constraints if and only if `IsNotNull` appears in the root.
Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
+- LocalRelation [value#1]
```
After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
+- LocalRelation [value#1]
```
### How was this patch tested?
Added a test
Author: gatorsmile <gatorsmile@gmail.com>
Closes #16067 from gatorsmile/isNotNull2.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 27 | ||||
-rw-r--r-- | sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala | 9 |
2 files changed, 29 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 45ee2964d4..b108017c4c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -40,14 +40,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } /** - * Infers a set of `isNotNull` constraints from a given set of equality/comparison expressions as - * well as non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this + * Infers a set of `isNotNull` constraints from null intolerant expressions as well as + * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this * returns a constraint of the form `isNotNull(a)` */ private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = { // First, we propagate constraints from the null intolerant expressions. - var isNotNullConstraints: Set[Expression] = - constraints.flatMap(scanNullIntolerantExpr).map(IsNotNull(_)) + var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints) // Second, we infer additional constraints from non-nullable attributes that are part of the // operator's output @@ -58,13 +57,27 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } /** + * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions + * of constraints. + */ + private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = + constraint match { + // When the root is IsNotNull, we can push IsNotNull through the child null intolerant + // expressions + case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + // Constraints always return true for all the inputs. That means, null will never be returned. + // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child + // null intolerant expressions. + case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + } + + /** * Recursively explores the expressions which are null intolerant and returns all attributes * in these expressions. */ - private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { + private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) - case _: NullIntolerant | IsNotNull(_: NullIntolerant) => - expr.children.flatMap(scanNullIntolerantExpr) + case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute) case _ => Seq.empty[Attribute] } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 8068ce922e..a191aa8fee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -351,6 +351,15 @@ class ConstraintPropagationSuite extends SparkFunSuite { IsNotNull(IsNotNull(resolveColumn(tr, "b"))), IsNotNull(resolveColumn(tr, "a")), IsNotNull(resolveColumn(tr, "c"))))) + + verifyConstraints( + tr.where('a.attr === 1 && IsNotNull(resolveColumn(tr, "b")) && + IsNotNull(resolveColumn(tr, "c"))).analyze.constraints, + ExpressionSet(Seq( + resolveColumn(tr, "a") === 1, + IsNotNull(resolveColumn(tr, "c")), + IsNotNull(resolveColumn(tr, "a")), + IsNotNull(resolveColumn(tr, "b"))))) } test("infer IsNotNull constraints from non-nullable attributes") { |