diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-08-05 10:50:26 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-08-05 10:50:26 +0200 |
commit | 5effc016c893ce917d535cc1b5026d8e4c846721 (patch) | |
tree | 59e28575a90ec38a17b26dad58297c5d5bfd8436 /sql/hive/src | |
parent | faaefab26ffea3a5edfeaff42db222c8cd3ff5f1 (diff) | |
download | spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.gz spark-5effc016c893ce917d535cc1b5026d8e4c846721.tar.bz2 spark-5effc016c893ce917d535cc1b5026d8e4c846721.zip |
[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS
## What changes were proposed in this pull request?
we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #14482 from cloud-fan/table.
Diffstat (limited to 'sql/hive/src')
4 files changed, 28 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index db970785a7..c7c1acda25 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,15 +23,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -436,23 +434,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case p: LogicalPlan if p.resolved => p - case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => - val desc = if (table.storage.serde.isEmpty) { + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => + val newTableDesc = if (tableDesc.storage.serde.isEmpty) { // add default serde - table.withNewStorage( + tableDesc.withNewStorage( serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { - table + tableDesc } - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc) + + // Currently we will never hit this branch, as SQL string API can only use `Ignore` or + // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde + // tables yet. + if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + throw new AnalysisException("" + + "CTAS for hive serde tables does not support append or overwrite semantics.") + } execution.CreateHiveTableAsSelectCommand( - desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - child, - allowExisting) + newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), + query, + mode == SaveMode.Ignore) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 8773993d36..e01c053ab5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -65,6 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) catalog.ParquetConversions :: catalog.OrcConversions :: catalog.CreateTables :: + PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e0c07db3b0..69a6884c7a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.types.StructType @@ -36,8 +37,7 @@ class HiveDDLCommandSuite extends PlanTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { - case c: CreateTableCommand => (c.table, c.ifNotExists) - case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting) + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d15e11a7ff..e078b58542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -141,6 +141,13 @@ class HiveDDLSuite } } + test("create table: partition column names exist in table definition") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)") + } + assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => |