diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-08-26 08:52:10 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-08-26 08:52:10 -0700 |
commit | 28ab17922a227e8d93654d3478c0d493bfb599d5 (patch) | |
tree | b06cd32184549eb1a22ec9b7bd55398f7ebcf40b | |
parent | 6063d5963fcf01768570c1a9b542be6175a3bcbc (diff) | |
download | spark-28ab17922a227e8d93654d3478c0d493bfb599d5.tar.gz spark-28ab17922a227e8d93654d3478c0d493bfb599d5.tar.bz2 spark-28ab17922a227e8d93654d3478c0d493bfb599d5.zip |
[SPARK-17260][MINOR] move CreateTables to HiveStrategies
## What changes were proposed in this pull request?
`CreateTables` rule turns a general `CreateTable` plan to `CreateHiveTableAsSelectCommand` for hive serde table. However, this rule is logically a planner strategy, we should move it to `HiveStrategies`, to be consistent with other DDL commands.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #14825 from cloud-fan/ctas.
4 files changed, 27 insertions, 37 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 701b73a4aa..ff82c7f7af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } } - - /** - * Creates any tables required for query execution. - * For example, because of a CREATE TABLE X AS statement. - */ - object CreateTables extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => - val newTableDesc = if (tableDesc.storage.serde.isEmpty) { - // add default serde - tableDesc.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - tableDesc - } - - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc) - - // Currently we will never hit this branch, as SQL string API can only use `Ignore` or - // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde - // tables yet. - if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { - throw new AnalysisException("" + - "CTAS for hive serde tables does not support append or overwrite semantics.") - } - - execution.CreateHiveTableAsSelectCommand( - newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - query, - mode == SaveMode.Ignore) - } - } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index ca8c7347f2..86d3b6de0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog( val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index a7cc7cc142..f3c4135da6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - catalog.CreateTables :: PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 17956ded17..fb11c849ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -45,6 +47,31 @@ private[hive] trait HiveStrategies { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil + + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => + val newTableDesc = if (tableDesc.storage.serde.isEmpty) { + // add default serde + tableDesc.withNewStorage( + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } else { + tableDesc + } + + // Currently we will never hit this branch, as SQL string API can only use `Ignore` or + // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde + // tables yet. + if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + throw new AnalysisException("" + + "CTAS for hive serde tables does not support append or overwrite semantics.") + } + + val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val cmd = CreateHiveTableAsSelectCommand( + newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), + query, + mode == SaveMode.Ignore) + ExecutedCommandExec(cmd) :: Nil + case _ => Nil } } |