From 51b1fe1426ffecac6c4644523633ea1562ff9a4e Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 8 Dec 2014 17:39:12 -0800 Subject: [SPARK-4769] [SQL] CTAS does not work when reading from temporary tables This is the code refactor and follow ups for #2570 Author: Cheng Hao Closes #3336 from chenghao-intel/createtbl and squashes the following commits: 3563142 [Cheng Hao] remove the unused variable e215187 [Cheng Hao] eliminate the compiling warning 4f97f14 [Cheng Hao] fix bug in unittest 5d58812 [Cheng Hao] revert the API changes b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 26 ++++++++++++++++++++-- .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 +++++++++--- .../sql/hive/execution/CreateTableAsSelect.scala | 16 +++++-------- .../spark/sql/hive/execution/SQLQuerySuite.scala | 9 ++++++++ 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 91a157785d..60865638e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with * For example, because of a CREATE TABLE X AS statement. */ object CreateTables extends Rule[LogicalPlan] { + import org.apache.hadoop.hive.ql.Context + import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer} + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case CreateTableAsSelect(db, tableName, child, allowExisting, extra) => + case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) => val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) - CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra) + // Get the CreateTableDesc from Hive SemanticAnalyzer + val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) { + None + } else { + val sa = new SemanticAnalyzer(hive.hiveconf) { + override def analyzeInternal(ast: ASTNode) { + // A hack to intercept the SemanticAnalyzer.analyzeInternal, + // to ignore the SELECT clause of the CTAS + val method = classOf[SemanticAnalyzer].getDeclaredMethod( + "analyzeCreateTable", classOf[ASTNode], classOf[QB]) + method.setAccessible(true) + method.invoke(this, ast, this.getQB) + } + } + + sa.analyze(extra, new Context(hive.hiveconf)) + Some(sa.getQB().getTableDesc) + } + + CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index edf291f917..5f02e95ac3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.ql.parse.ASTNode +import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -181,13 +182,20 @@ private[hive] trait HiveStrategies { execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite)(hiveContext) :: Nil case logical.CreateTableAsSelect( - Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) => - CreateTableAsSelect( + Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) => + execution.CreateTableAsSelect( database, tableName, child, allowExisting, - extra) :: Nil + Some(desc)) :: Nil + case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) => + execution.CreateTableAsSelect( + database, + tableName, + child, + allowExisting, + None) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 3d24d87bc3..b83689ceab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.hive.execution -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode} +import org.apache.hadoop.hive.ql.plan.CreateTableDesc + import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row @@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation * @param query the query whose result will be insert into the new relation * @param allowExisting allow continue working if it's already exists, otherwise * raise exception - * @param extra the extra information for this Operator, it should be the - * ASTNode object for extracting the CreateTableDesc. + * @param desc the CreateTableDesc, which may contains serde, storage handler etc. */ @Experimental @@ -45,7 +44,7 @@ case class CreateTableAsSelect( tableName: String, query: LogicalPlan, allowExisting: Boolean, - extra: ASTNode) extends LeafNode with Command { + desc: Option[CreateTableDesc]) extends LeafNode with Command { def output = Seq.empty @@ -53,13 +52,8 @@ case class CreateTableAsSelect( // A lazy computing of the metastoreRelation private[this] lazy val metastoreRelation: MetastoreRelation = { - // Get the CreateTableDesc from Hive SemanticAnalyzer - val sa = new SemanticAnalyzer(sc.hiveconf) - - sa.analyze(extra, new Context(sc.hiveconf)) - val desc = sa.getQB().getTableDesc // Create Hive Table - sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc)) + sc.catalog.createTable(database, tableName, query.output, allowExisting, desc) // Get the Metastore Relation sc.catalog.lookupRelation(Some(database), tableName, None) match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e9b1943ff8..b341eae512 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest { checkAnswer( sql("SELECT f1.f2.f3 FROM nested"), 1) + checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"), + Seq.empty[Row]) + checkAnswer( + sql("SELECT * FROM test_ctas_1234"), + sql("SELECT * FROM nested").collect().toSeq) + + intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] { + sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect() + } } test("test CTAS") { -- cgit v1.2.3