aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-17 17:50:39 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 17:50:39 -0800
commitd5f12bfe8f0a98d6fee114bb24376668ebe2898e (patch)
treee7c620cdda67749bc9877f023d090519c540884a /sql
parenta51fc7ef9adb6a41c4857918217f800858fced2c (diff)
downloadspark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.tar.gz
spark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.tar.bz2
spark-d5f12bfe8f0a98d6fee114bb24376668ebe2898e.zip
[SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators
https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause. Author: Yin Huai <yhuai@databricks.com> Closes #4663 from yhuai/projectResolved and squashes the following commits: 472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala10
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala32
3 files changed, 53 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93274..89544add74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
+
+ override lazy val resolved: Boolean = {
+ val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+ case agg: AggregateExpression => agg
+ case generator: Generator => generator
+ }.nonEmpty
+ )
+
+ !expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
+ }
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651e14..aec7847356 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(caseInsensitiveAnalyze(plan).resolved)
}
+ test("check project's resolved") {
+ assert(Project(testRelation.output, testRelation).resolved)
+
+ assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+ val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)())
+ assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+ assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved)
+ }
+
test("analyze project") {
assert(
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8d9eec3d8..ae03bc5e99 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -316,4 +316,34 @@ class SQLQuerySuite extends QueryTest {
dropTempTable("data")
}
+
+ test("logical.Project should not be resolved if it contains aggregates or generators") {
+ // This test is used to test the fix of SPARK-5875.
+ // The original issue was that Project's resolved will be true when it contains
+ // AggregateExpressions or Generators. However, in this case, the Project
+ // is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
+ // PreInsertionCasts will actually start to work before ImplicitGenerate and then
+ // generates an invalid query plan.
+ val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i+1}]}"""))
+ jsonRDD(rdd).registerTempTable("data")
+ val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+ setConf("spark.sql.hive.convertCTAS", "false")
+
+ sql("CREATE TABLE explodeTest (key bigInt)")
+ table("explodeTest").queryExecution.analyzed match {
+ case metastoreRelation: MetastoreRelation => // OK
+ case _ =>
+ fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
+ }
+
+ sql(s"INSERT OVERWRITE TABLE explodeTest SELECT explode(a) AS val FROM data")
+ checkAnswer(
+ sql("SELECT key from explodeTest"),
+ (1 to 5).flatMap(i => Row(i) :: Row(i + 1) :: Nil)
+ )
+
+ sql("DROP TABLE explodeTest")
+ dropTempTable("data")
+ setConf("spark.sql.hive.convertCTAS", originalConf)
+ }
}