aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-11 22:51:49 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-11 22:52:07 -0800
commitc82e99d87a8c37a1c4c2b24ed54e6295decf9117 (patch)
tree2410d457154723417639ae264e518684013a9908 /sql
parentc3b0713838797c9676be4ddae7cfcacae1775dd2 (diff)
downloadspark-c82e99d87a8c37a1c4c2b24ed54e6295decf9117.tar.gz
spark-c82e99d87a8c37a1c4c2b24ed54e6295decf9117.tar.bz2
spark-c82e99d87a8c37a1c4c2b24ed54e6295decf9117.zip
[SPARK-4825] [SQL] CTAS fails to resolve when created using saveAsTable
Fix bug when query like: ``` test("save join to table") { val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) sql("CREATE TABLE test1 (key INT, value STRING)") testData.insertInto("test1") sql("CREATE TABLE test2 (key INT, value STRING)") testData.insertInto("test2") testData.insertInto("test2") sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test") checkAnswer( table("test"), sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq) } ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #3673 from chenghao-intel/spark_4825 and squashes the following commits: e8cbd56 [Cheng Hao] alternate the pattern matching order for logical plan:CTAS e004895 [Cheng Hao] fix bug (cherry picked from commit 0abbff286220bbcbbf28fbd80b8c5bf59ff37ce2) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala13
3 files changed, 23 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 00bdf108a8..64b8d45ebb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -121,7 +121,7 @@ case class CreateTableAsSelect[T](
allowExisting: Boolean,
desc: Option[T] = None) extends UnaryNode {
override def output = Seq.empty[Attribute]
- override lazy val resolved = (databaseName != None && childrenResolved)
+ override lazy val resolved = databaseName != None && childrenResolved
}
case class WriteToFile(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 60865638e1..d8b10b78c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -261,6 +261,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
+ // TODO extra is in type of ASTNode which means the logical plan is not resolved
+ // Need to think about how to implement the CreateTableAsSelect.resolved
case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
@@ -285,6 +287,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
+
+ case p: LogicalPlan if p.resolved => p
+
+ case p @ CreateTableAsSelect(db, tableName, child, allowExisting, None) =>
+ val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+ val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+ CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, None)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b341eae512..96f3430207 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -137,6 +137,19 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
}
+ test("SPARK-4825 save join to table") {
+ val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
+ sql("CREATE TABLE test1 (key INT, value STRING)")
+ testData.insertInto("test1")
+ sql("CREATE TABLE test2 (key INT, value STRING)")
+ testData.insertInto("test2")
+ testData.insertInto("test2")
+ sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test")
+ checkAnswer(
+ table("test"),
+ sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
+ }
+
test("SPARK-3708 Backticks aren't handled correctly is aliases") {
checkAnswer(
sql("SELECT k FROM (SELECT `key` AS `k` FROM src) a"),