aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala27
2 files changed, 31 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 09c200fa83..a18efc90ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -519,7 +519,6 @@ case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
-
override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))
@@ -527,6 +526,10 @@ case class Expand(
val sizeInBytes = super.statistics.sizeInBytes * projections.length
Statistics(sizeInBytes = sizeInBytes)
}
+
+ // This operator can reuse attributes (for example making them null when doing a roll up) so
+ // the contraints of the child may no longer be valid.
+ override protected def validConstraints: Set[Expression] = Set.empty[Expression]
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 5cbb889f8e..49c1353efb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -88,6 +88,33 @@ class ConstraintPropagationSuite extends SparkFunSuite {
IsNotNull(resolveColumn(aliasedRelation.analyze, "a")))))
}
+ test("propagating constraints in expand") {
+ val tr = LocalRelation('a.int, 'b.int, 'c.int)
+
+ assert(tr.analyze.constraints.isEmpty)
+
+ // We add IsNotNull constraints for 'a, 'b and 'c into LocalRelation
+ // by creating notNullRelation.
+ val notNullRelation = tr.where('c.attr > 10 && 'a.attr < 5 && 'b.attr > 2)
+ verifyConstraints(notNullRelation.analyze.constraints,
+ ExpressionSet(Seq(resolveColumn(notNullRelation.analyze, "c") > 10,
+ IsNotNull(resolveColumn(notNullRelation.analyze, "c")),
+ resolveColumn(notNullRelation.analyze, "a") < 5,
+ IsNotNull(resolveColumn(notNullRelation.analyze, "a")),
+ resolveColumn(notNullRelation.analyze, "b") > 2,
+ IsNotNull(resolveColumn(notNullRelation.analyze, "b")))))
+
+ val expand = Expand(
+ Seq(
+ Seq('c, Literal.create(null, StringType), 1),
+ Seq('c, 'a, 2)),
+ Seq('c, 'a, 'gid.int),
+ Project(Seq('a, 'c),
+ notNullRelation))
+ verifyConstraints(expand.analyze.constraints,
+ ExpressionSet(Seq.empty[Expression]))
+ }
+
test("propagating constraints in aliases") {
val tr = LocalRelation('a.int, 'b.string, 'c.int)