aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-10 12:16:46 -0800
committerYin Huai <yhuai@databricks.com>2016-03-10 12:16:46 -0800
commit19f4ac6dc766a3d33b04cd071b414c3bea88e97d (patch)
treea4d886737874cfd68fa371b747e3411bafcd9b37 /sql/catalyst
parent235f4ac6fc05802a00889a3a0b39377711cbc7e3 (diff)
downloadspark-19f4ac6dc766a3d33b04cd071b414c3bea88e97d.tar.gz
spark-19f4ac6dc766a3d33b04cd071b414c3bea88e97d.tar.bz2
spark-19f4ac6dc766a3d33b04cd071b414c3bea88e97d.zip
[SPARK-13759][SQL] Add IsNotNull constraints for expressions with an inequality
## What changes were proposed in this pull request? This PR adds support for inferring `IsNotNull` constraints from expressions with an `!==`. More specifically, if an operator has a condition on `a !== b`, we know that both `a` and `b` in the operator output can no longer be null. ## How was this patch tested? 1. Modified a test in `ConstraintPropagationSuite` to test for expressions with an inequality. 2. Added a test in `NullFilteringSuite` for making sure an Inner join with a "non-equal" condition appropriately filters out null from their input. cc nongli Author: Sameer Agarwal <sameer@databricks.com> Closes #11594 from sameeragarwal/isnotequal-constraints.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala21
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala4
3 files changed, 23 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 371d72ef5a..40c06ed6d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -56,6 +56,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
Set(IsNotNull(l), IsNotNull(r))
case LessThanOrEqual(l, r) =>
Set(IsNotNull(l), IsNotNull(r))
+ case Not(EqualTo(l, r)) =>
+ Set(IsNotNull(l), IsNotNull(r))
case _ =>
Set.empty[Expression]
}.foldLeft(Set.empty[Expression])(_ union _.toSet)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
index 7e52d5ef67..142e4ae6e4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
@@ -44,11 +44,28 @@ class NullFilteringSuite extends PlanTest {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
- condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+ condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ .analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
- condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+ condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ .analyze
+ val optimized = Optimize.execute(originalQuery)
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("single inner join: filter out nulls on either side on non equal keys") {
+ val x = testRelation.subquery('x)
+ val y = testRelation.subquery('y)
+ val originalQuery = x.join(y,
+ condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ .analyze
+ val left = x.where(IsNotNull('a) && IsNotNull('b))
+ val right = y.where(IsNotNull('a) && IsNotNull('c))
+ val correctAnswer = left.join(right,
+ condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+ .analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 868ad934da..e70d3794ab 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -64,10 +64,10 @@ class ConstraintPropagationSuite extends SparkFunSuite {
verifyConstraints(tr
.where('a.attr > 10)
.select('c.attr, 'a.attr)
- .where('c.attr < 100)
+ .where('c.attr =!= 100)
.analyze.constraints,
ExpressionSet(Seq(resolveColumn(tr, "a") > 10,
- resolveColumn(tr, "c") < 100,
+ resolveColumn(tr, "c") =!= 100,
IsNotNull(resolveColumn(tr, "a")),
IsNotNull(resolveColumn(tr, "c")))))
}