diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-12-11 22:51:49 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-11 22:51:49 -0800 |
commit | 0abbff286220bbcbbf28fbd80b8c5bf59ff37ce2 (patch) | |
tree | d820a4998585b6bce08431d090f02ab5993e5473 | |
parent | cbb634ae69163ca7a8b5cb94c5fb17cb5c910cb1 (diff) | |
download | spark-0abbff286220bbcbbf28fbd80b8c5bf59ff37ce2.tar.gz spark-0abbff286220bbcbbf28fbd80b8c5bf59ff37ce2.tar.bz2 spark-0abbff286220bbcbbf28fbd80b8c5bf59ff37ce2.zip |
[SPARK-4825] [SQL] CTAS fails to resolve when created using saveAsTable
Fix bug when query like:
```
test("save join to table") {
val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
sql("CREATE TABLE test1 (key INT, value STRING)")
testData.insertInto("test1")
sql("CREATE TABLE test2 (key INT, value STRING)")
testData.insertInto("test2")
testData.insertInto("test2")
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test")
checkAnswer(
table("test"),
sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
}
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes #3673 from chenghao-intel/spark_4825 and squashes the following commits:
e8cbd56 [Cheng Hao] alternate the pattern matching order for logical plan:CTAS
e004895 [Cheng Hao] fix bug
3 files changed, 23 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 00bdf108a8..64b8d45ebb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -121,7 +121,7 @@ case class CreateTableAsSelect[T]( allowExisting: Boolean, desc: Option[T] = None) extends UnaryNode { override def output = Seq.empty[Attribute] - override lazy val resolved = (databaseName != None && childrenResolved) + override lazy val resolved = databaseName != None && childrenResolved } case class WriteToFile( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 60865638e1..d8b10b78c6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -261,6 +261,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p + // TODO extra is in type of ASTNode which means the logical plan is not resolved + // Need to think about how to implement the CreateTableAsSelect.resolved case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) @@ -285,6 +287,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc) + + case p: LogicalPlan if p.resolved => p + + case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) => + val (dbName, tblName) = processDatabaseAndTableName(db, tableName) + val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) + CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b341eae512..96f3430207 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -137,6 +137,19 @@ class SQLQuerySuite extends QueryTest { sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) } + test("SPARK-4825 save join to table") { + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) + sql("CREATE TABLE test1 (key INT, value STRING)") + testData.insertInto("test1") + sql("CREATE TABLE test2 (key INT, value STRING)") + testData.insertInto("test2") + testData.insertInto("test2") + sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test") + checkAnswer( + table("test"), + sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) + } + test("SPARK-3708 Backticks aren't handled correctly is aliases") { checkAnswer( sql("SELECT k FROM (SELECT `key` AS `k` FROM src) a"), |