From cfa5ae84ed0f48b3b108d0614dbf6fcd79ef5179 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Jul 2016 18:37:15 -0700 Subject: [SPARK-16644][SQL] Aggregate should not propagate constraints containing aggregate expressions ## What changes were proposed in this pull request? aggregate expressions can only be executed inside `Aggregate`, if we propagate it up with constraints, the parent operator can not execute it and will fail at runtime. ## How was this patch tested? new test in SQLQuerySuite Author: Wenchen Fan Author: Yin Huai Closes #14281 from cloud-fan/bug. --- .../catalyst/plans/logical/basicLogicalOperators.scala | 6 ++++-- .../sql/catalyst/plans/ConstraintPropagationSuite.scala | 6 ++++-- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c0e400f617..b31f5aa11c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -483,8 +483,10 @@ case class Aggregate( override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows - override def validConstraints: Set[Expression] = - child.constraints.union(getAliasedConstraints(aggregateExpressions)) + override def validConstraints: Set[Expression] = { + val nonAgg = aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty) + child.constraints.union(getAliasedConstraints(nonAgg)) + } override lazy val statistics: Statistics = { if (groupingExpressions.isEmpty) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 0b73b5e009..5a76969235 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -79,13 +79,15 @@ class ConstraintPropagationSuite extends SparkFunSuite { assert(tr.analyze.constraints.isEmpty) val aliasedRelation = tr.where('c.attr > 10 && 'a.attr < 5) - .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a).analyze + .groupBy('a, 'c, 'b)('a, 'c.as("c1"), count('a).as("a3")).select('c1, 'a, 'a3).analyze + // SPARK-16644: aggregate expression count(a) should not appear in the constraints. verifyConstraints(aliasedRelation.analyze.constraints, ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "c1") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "c1")), resolveColumn(aliasedRelation.analyze, "a") < 5, - IsNotNull(resolveColumn(aliasedRelation.analyze, "a"))))) + IsNotNull(resolveColumn(aliasedRelation.analyze, "a")), + IsNotNull(resolveColumn(aliasedRelation.analyze, "a3"))))) } test("propagating constraints in expand") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index eeaa0103a0..7513640582 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2965,4 +2965,21 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-16644: Aggregate should not put aggregate expressions to constraints") { + withTable("tbl") { + sql("CREATE TABLE tbl(a INT, b INT) USING parquet") + checkAnswer(sql( + """ + |SELECT + | a, + | MAX(b) AS c1, + | b AS c2 + |FROM tbl + |WHERE a = b + |GROUP BY a, b + |HAVING c1 = 1 + """.stripMargin), Nil) + } + } } -- cgit v1.2.3