aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-07-20 18:37:15 -0700
committerYin Huai <yhuai@databricks.com>2016-07-20 18:37:15 -0700
commitcfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179 (patch)
tree1b529627a22b96cfc04cea4506c717a3737f1698
parent75a06aa256aa256c112555609a93c1e1dbb1cb4b (diff)
downloadspark-cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179.tar.gz
spark-cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179.tar.bz2
spark-cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179.zip
[SPARK-16644][SQL] Aggregate should not propagate constraints containing aggregate expressions
## What changes were proposed in this pull request? aggregate expressions can only be executed inside `Aggregate`, if we propagate it up with constraints, the parent operator can not execute it and will fail at runtime. ## How was this patch tested? new test in SQLQuerySuite Author: Wenchen Fan <wenchen@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #14281 from cloud-fan/bug.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala17
3 files changed, 25 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c0e400f617..b31f5aa11c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -483,8 +483,10 @@ case class Aggregate(
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
override def maxRows: Option[Long] = child.maxRows
- override def validConstraints: Set[Expression] =
- child.constraints.union(getAliasedConstraints(aggregateExpressions))
+ override def validConstraints: Set[Expression] = {
+ val nonAgg = aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
+ child.constraints.union(getAliasedConstraints(nonAgg))
+ }
override lazy val statistics: Statistics = {
if (groupingExpressions.isEmpty) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 0b73b5e009..5a76969235 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -79,13 +79,15 @@ class ConstraintPropagationSuite extends SparkFunSuite {
assert(tr.analyze.constraints.isEmpty)
val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5)
- .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a).analyze
+ .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3).analyze
+ // SPARK-16644: aggregate expression count(a) should not appear in the constraints.
verifyConstraints(aliasedRelation.analyze.constraints,
ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "c1") > 10,
IsNotNull(resolveColumn(aliasedRelation.analyze, "c1")),
resolveColumn(aliasedRelation.analyze, "a") < 5,
- IsNotNull(resolveColumn(aliasedRelation.analyze, "a")))))
+ IsNotNull(resolveColumn(aliasedRelation.analyze, "a")),
+ IsNotNull(resolveColumn(aliasedRelation.analyze, "a3")))))
}
test("propagating constraints in expand") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index eeaa0103a0..7513640582 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2965,4 +2965,21 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") {
+ withTable("tbl") {
+ sql("CREATE TABLE tbl(a INT, b INT) USING parquet")
+ checkAnswer(sql(
+ """
+ |SELECT
+ | a,
+ | MAX(b) AS c1,
+ | b AS c2
+ |FROM tbl
+ |WHERE a = b
+ |GROUP BY a, b
+ |HAVING c1 = 1
+ """.stripMargin), Nil)
+ }
+ }
}