aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-17 17:50:39 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 17:50:39 -0800
commitd5f12bfe8f0a98d6fee114bb24376668ebe2898e (patch)
treee7c620cdda67749bc9877f023d090519c540884a /sql/catalyst
parenta51fc7ef9adb6a41c4857918217f800858fced2c (diff)
downloadspark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.tar.gz
spark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.tar.bz2
spark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.zip
[SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators
https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause. Author: Yin Huai <yhuai@databricks.com> Closes #4663 from yhuai/projectResolved and squashes the following commits: 472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala13
2 files changed, 22 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93274..89544add74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
+
+ override lazy val resolved: Boolean = {
+ val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+ case agg: AggregateExpression => agg
+ case generator: Generator => generator
+ }.nonEmpty
+ )
+
+ !expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
+ }
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651e14..aec7847356 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(caseInsensitiveAnalyze(plan).resolved)
}
+ test("check project's resolved") {
+ assert(Project(testRelation.output, testRelation).resolved)
+
+ assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+ val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)())
+ assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+ assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved)
+ }
+
test("analyze project") {
assert(
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===