aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-08 17:39:12 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-08 17:39:56 -0800
commit31a6d4fede28d46cd379f788678cc33b0b982d14 (patch)
tree057f309a741266642c820771a02f4b9a33a19ee0 /sql
parentf4160324c55b4d168421af5473ce306bc03a77bb (diff)
downloadspark-31a6d4fede28d46cd379f788678cc33b0b982d14.tar.gz
spark-31a6d4fede28d46cd379f788678cc33b0b982d14.tar.bz2
spark-31a6d4fede28d46cd379f788678cc33b0b982d14.zip
[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables
This is the code refactor and follow ups for #2570 Author: Cheng Hao <hao.cheng@intel.com> Closes #3336 from chenghao-intel/createtbl and squashes the following commits: 3563142 [Cheng Hao] remove the unused variable e215187 [Cheng Hao] eliminate the compiling warning 4f97f14 [Cheng Hao] fix bug in unittest 5d58812 [Cheng Hao] revert the API changes b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS (cherry picked from commit 51b1fe1426ffecac6c4644523633ea1562ff9a4e) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala26
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala9
4 files changed, 49 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 91a157785d..60865638e1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
+ import org.apache.hadoop.hive.ql.Context
+ import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
+ case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
- CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
+ // Get the CreateTableDesc from Hive SemanticAnalyzer
+ val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
+ None
+ } else {
+ val sa = new SemanticAnalyzer(hive.hiveconf) {
+ override def analyzeInternal(ast: ASTNode) {
+ // A hack to intercept the SemanticAnalyzer.analyzeInternal,
+ // to ignore the SELECT clause of the CTAS
+ val method = classOf[SemanticAnalyzer].getDeclaredMethod(
+ "analyzeCreateTable", classOf[ASTNode], classOf[QB])
+ method.setAccessible(true)
+ method.invoke(this, ast, this.getQB)
+ }
+ }
+
+ sa.analyze(extra, new Context(hive.hiveconf))
+ Some(sa.getQB().getTableDesc)
+ }
+
+ CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index edf291f917..5f02e95ac3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.hive.ql.parse.ASTNode
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
execution.InsertIntoHiveTable(
table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.CreateTableAsSelect(
- Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
- CreateTableAsSelect(
+ Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
+ execution.CreateTableAsSelect(
database,
tableName,
child,
allowExisting,
- extra) :: Nil
+ Some(desc)) :: Nil
+ case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
+ execution.CreateTableAsSelect(
+ database,
+ tableName,
+ child,
+ allowExisting,
+ None) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 3d24d87bc3..b83689ceab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.hive.execution
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
@@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
* @param query the query whose result will be insert into the new relation
* @param allowExisting allow continue working if it's already exists, otherwise
* raise exception
- * @param extra the extra information for this Operator, it should be the
- * ASTNode object for extracting the CreateTableDesc.
+ * @param desc the CreateTableDesc, which may contains serde, storage handler etc.
*/
@Experimental
@@ -45,7 +44,7 @@ case class CreateTableAsSelect(
tableName: String,
query: LogicalPlan,
allowExisting: Boolean,
- extra: ASTNode) extends LeafNode with Command {
+ desc: Option[CreateTableDesc]) extends LeafNode with Command {
def output = Seq.empty
@@ -53,13 +52,8 @@ case class CreateTableAsSelect(
// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
- // Get the CreateTableDesc from Hive SemanticAnalyzer
- val sa = new SemanticAnalyzer(sc.hiveconf)
-
- sa.analyze(extra, new Context(sc.hiveconf))
- val desc = sa.getQB().getTableDesc
// Create Hive Table
- sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
+ sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e9b1943ff8..b341eae512 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
checkAnswer(
sql("SELECT f1.f2.f3 FROM nested"),
1)
+ checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
+ Seq.empty[Row])
+ checkAnswer(
+ sql("SELECT * FROM test_ctas_1234"),
+ sql("SELECT * FROM nested").collect().toSeq)
+
+ intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
+ sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
+ }
}
test("test CTAS") {