From 5effc016c893ce917d535cc1b5026d8e4c846721 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 5 Aug 2016 10:50:26 +0200 Subject: [SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS ## What changes were proposed in this pull request? we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling. ## How was this patch tested? existing tests Author: Wenchen Fan Closes #14482 from cloud-fan/table. --- .../spark/sql/catalyst/catalog/interface.scala | 17 +-- .../catalyst/catalog/ExternalCatalogSuite.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 24 +-- .../main/scala/org/apache/spark/sql/Dataset.scala | 8 +- .../spark/sql/execution/SparkSqlParser.scala | 100 ++++++------ .../spark/sql/execution/SparkStrategies.scala | 59 ++++--- .../execution/command/createDataSourceTables.scala | 64 +------- .../spark/sql/execution/datasources/ddl.scala | 49 ++---- .../spark/sql/execution/datasources/rules.scala | 170 ++++++++++++++++++--- .../apache/spark/sql/internal/CatalogImpl.scala | 46 +++--- .../apache/spark/sql/internal/SessionState.scala | 3 +- .../sql/execution/command/DDLCommandSuite.scala | 151 ++++++++---------- .../spark/sql/execution/command/DDLSuite.scala | 47 +++++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 29 ++-- .../apache/spark/sql/hive/HiveSessionState.scala | 1 + .../spark/sql/hive/HiveDDLCommandSuite.scala | 6 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 7 + 17 files changed, 417 insertions(+), 372 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 38f0bc2c4f..f7762e0f8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,8 +21,7 @@ import java.util.Date import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.StructType @@ -112,6 +111,8 @@ case class BucketSpec( * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the * future once we have a better understanding of how we want to handle skewed columns. * + * @param provider the name of the data source provider for this table, e.g. parquet, json, etc. + * Can be None if this table is a View, should be "hive" for hive serde tables. * @param unsupportedFeatures is a list of string descriptions of features that are used by the * underlying table but not supported by Spark SQL yet. */ @@ -120,6 +121,7 @@ case class CatalogTable( tableType: CatalogTableType, storage: CatalogStorageFormat, schema: StructType, + provider: Option[String] = None, partitionColumnNames: Seq[String] = Seq.empty, bucketSpec: Option[BucketSpec] = None, owner: String = "", @@ -131,16 +133,6 @@ case class CatalogTable( comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty) { - // Verify that the provided columns are part of the schema - private val colNames = schema.map(_.name).toSet - private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { - require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + - s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") - } - requireSubsetOfSchema(partitionColumnNames, "partition") - requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort") - requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket") - /** schema of this table's partition columns */ def partitionSchema: StructType = StructType(schema.filter { c => partitionColumnNames.contains(c.name) @@ -189,6 +181,7 @@ case class CatalogTable( s"Last Access: ${new Date(lastAccessTime).toString}", s"Type: ${tableType.name}", if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", + if (provider.isDefined) s"Provider: ${provider.get}" else "", if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" ) ++ bucketStrings ++ Seq( viewOriginalText.map("Original View: " + _).getOrElse(""), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 201d39a364..54365fd978 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string") + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some("hive") ) catalog.createTable(table, ignoreIfExists = false) @@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac storage = CatalogStorageFormat( Some(Utils.createTempDir().getAbsolutePath), None, None, None, false, Map.empty), - schema = new StructType().add("a", "int").add("b", "string") + schema = new StructType().add("a", "int").add("b", "string"), + provider = Some("hive") ) catalog.createTable(externalTable, ignoreIfExists = false) assert(!exists(db.locationUri, "external_table")) @@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac .add("col2", "string") .add("a", "int") .add("b", "string"), + provider = Some("hive"), partitionColumnNames = Seq("a", "b") ) catalog.createTable(table, ignoreIfExists = false) @@ -692,6 +695,7 @@ abstract class CatalogTestUtils { .add("col2", "string") .add("a", "int") .add("b", "string"), + provider = Some("hive"), partitionColumnNames = Seq("a", "b"), bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 44189881dd..6dbed26b0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,10 +23,11 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.types.StructType /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => - val cmd = - CreateTableUsingAsSelect( - tableIdent, - source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), - getBucketSpec, - mode, - extraOptions.toMap, - df.logicalPlan) + val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap), + schema = new StructType, + provider = Some(source), + partitionColumnNames = partitioningColumns.getOrElse(Nil), + bucketSpec = getBucketSpec + ) + val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan)) df.sparkSession.sessionState.executePlan(cmd).toRdd } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 263ee33742..9eef5cc5fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -24,7 +24,6 @@ import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import com.fasterxml.jackson.core.JsonFactory import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Experimental} @@ -35,18 +34,16 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} -import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} @@ -174,8 +171,7 @@ class Dataset[T] private[sql]( @transient private[sql] val logicalPlan: LogicalPlan = { def hasSideEffects(plan: LogicalPlan): Boolean = plan match { case _: Command | - _: InsertIntoTable | - _: CreateTableUsingAsSelect => true + _: InsertIntoTable => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 22b1e07219..2bb686254c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _} +import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.{DataType, StructType} @@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan. + * Create a [[CreateTable]] logical plan. */ override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) @@ -319,12 +319,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText + val schema = Option(ctx.colTypeList()).map(createStructType) val partitionColumnNames = Option(ctx.partitionColumnNames) .map(visitIdentifierList(_).toArray) .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) + val tableDesc = CatalogTable( + identifier = table, + // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the + // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a + // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate + // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and + // make it take `CatalogTable` directly. + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema.getOrElse(new StructType), + provider = Some(provider), + partitionColumnNames = partitionColumnNames, + bucketSpec = bucketSpec + ) + + // Determine the storage mode. + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + if (ctx.query != null) { // Get the backing query. val query = plan(ctx.query) @@ -333,32 +352,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx) } - // Determine the storage mode. - val mode = if (ifNotExists) { - SaveMode.Ignore - } else { - SaveMode.ErrorIfExists - } - - CreateTableUsingAsSelect( - table, provider, partitionColumnNames, bucketSpec, mode, options, query) + CreateTable(tableDesc, mode, Some(query)) } else { - val struct = Option(ctx.colTypeList()).map(createStructType) - if (struct.isEmpty && bucketSpec.nonEmpty) { - throw new ParseException( - "Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx) - } + if (temp) { + if (ifNotExists) { + operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx) + } - CreateTableUsing( - table, - struct, - provider, - temp, - options, - partitionColumnNames, - bucketSpec, - ifNotExists, - managedIfNoPath = true) + logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + + "CREATE TEMPORARY VIEW ... USING ... instead") + CreateTempViewUsing(table, schema, replace = true, provider, options) + } else { + CreateTable(tableDesc, mode, None) + } } } @@ -891,8 +897,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning either a [[CreateTableCommand]] or a - * [[CreateHiveTableAsSelectLogicalPlan]]. + * Create a table, returning a [[CreateTable]] logical plan. * * This is not used to create datasource tables, which is handled through * "CREATE TABLE ... USING ...". @@ -933,23 +938,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) - // Ensuring whether no duplicate name is used in table definition - val colNames = dataCols.map(_.name) - if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + - duplicateColumns.mkString("[", ",", "]"), ctx) - } - - // For Hive tables, partition columns must not be part of the schema - val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet) - if (badPartCols.nonEmpty) { - operationNotAllowed(s"Partition columns may not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx) - } - // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly val schema = StructType(dataCols ++ partitionCols) @@ -1001,10 +989,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { tableType = tableType, storage = storage, schema = schema, + provider = Some("hive"), partitionColumnNames = partitionCols.map(_.name), properties = properties, comment = comment) + val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + selectQuery match { case Some(q) => // Just use whatever is projected in the select statement as our schema @@ -1025,7 +1016,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null) if (conf.convertCTAS && !hasStorageProperties) { - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. val optionsWithPath = if (location.isDefined) { @@ -1033,19 +1023,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } else { Map.empty[String, String] } - CreateTableUsingAsSelect( - tableIdent = tableDesc.identifier, - provider = conf.defaultDataSourceName, - partitionColumns = tableDesc.partitionColumnNames.toArray, - bucketSpec = None, - mode = mode, - options = optionsWithPath, - q + + val newTableDesc = tableDesc.copy( + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), + provider = Some(conf.defaultDataSourceName) ) + + CreateTable(newTableDesc, mode, Some(q)) } else { - CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) + CreateTable(tableDesc, mode, Some(q)) } - case None => CreateTableCommand(tableDesc, ifNotExists) + case None => CreateTable(tableDesc, mode, None) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 52e19819f2..fb08e1228e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Strategy} +import org.apache.spark.sql.{execution, SaveMode, Strategy} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ @@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case c: CreateTableUsing if c.temporary && !c.allowExisting => - logWarning( - s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " + - s"please use CREATE TEMPORARY VIEW viewName USING... instead") - ExecutedCommandExec( - CreateTempViewUsing( - c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil - - case c: CreateTableUsing if !c.temporary => + case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => + val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) + ExecutedCommandExec(cmd) :: Nil + + case CreateTable(tableDesc, mode, None) => val cmd = CreateDataSourceTableCommand( - c.tableIdent, - c.userSpecifiedSchema, - c.provider, - c.options, - c.partitionColumns, - c.bucketSpec, - c.allowExisting, - c.managedIfNoPath) + tableDesc.identifier, + if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None, + tableDesc.provider.get, + tableDesc.storage.properties, + tableDesc.partitionColumnNames.toArray, + tableDesc.bucketSpec, + ignoreIfExists = mode == SaveMode.Ignore, + managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED) ExecutedCommandExec(cmd) :: Nil - case c: CreateTableUsing if c.temporary && c.allowExisting => - throw new AnalysisException( - "allowExisting should be set to false when creating a temporary table.") + // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule + // `CreateTables` - case c: CreateTableUsingAsSelect => + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" => val cmd = CreateDataSourceTableAsSelectCommand( - c.tableIdent, - c.provider, - c.partitionColumns, - c.bucketSpec, - c.mode, - c.options, - c.child) + tableDesc.identifier, + tableDesc.provider.get, + tableDesc.partitionColumnNames.toArray, + tableDesc.bucketSpec, + mode, + tableDesc.storage.properties, + query) ExecutedCommandExec(cmd) :: Nil - case c: CreateTempViewUsing => - ExecutedCommandExec(c) :: Nil + case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 93eb386ade..7b028e72ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.command -import java.util.regex.Pattern - import scala.collection.mutable import scala.util.control.NonFatal @@ -59,21 +57,6 @@ case class CreateDataSourceTableCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - // Since we are saving metadata to metastore, we need to check if metastore supports - // the table name and database name we have for this query. MetaStoreUtils.validateName - // is the method used by Hive to check if a table name or a database name is valid for - // the metastore. - if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) { - throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tableIdent.database.isDefined && - !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) { - throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + - s"for metastore. Metastore only accepts database name containing " + - s"characters, numbers and _.") - } - val tableName = tableIdent.unquotedString val sessionState = sparkSession.sessionState @@ -106,22 +89,12 @@ case class CreateDataSourceTableCommand( val partitionColumns = if (userSpecifiedSchema.nonEmpty) { userSpecifiedPartitionColumns } else { - val res = dataSource match { + // This is guaranteed in `PreprocessDDL`. + assert(userSpecifiedPartitionColumns.isEmpty) + dataSource match { case r: HadoopFsRelation => r.partitionSchema.fieldNames case _ => Array.empty[String] } - if (userSpecifiedPartitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " + - s"ignored. The schema and partition columns of table $tableIdent are inferred. " + - s"Schema: ${dataSource.schema.simpleString}; " + - s"Partition columns: ${res.mkString("(", ", ", ")")}") - } - res } CreateDataSourceTableUtils.createDataSourceTable( @@ -164,21 +137,6 @@ case class CreateDataSourceTableAsSelectCommand( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { - // Since we are saving metadata to metastore, we need to check if metastore supports - // the table name and database name we have for this query. MetaStoreUtils.validateName - // is the method used by Hive to check if a table name or a database name is valid for - // the metastore. - if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) { - throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tableIdent.database.isDefined && - !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) { - throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + - s"for metastore. Metastore only accepts database name containing " + - s"characters, numbers and _.") - } - val tableName = tableIdent.unquotedString val sessionState = sparkSession.sessionState var createMetastoreTable = false @@ -311,20 +269,6 @@ object CreateDataSourceTableUtils extends Logging { val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - /** - * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), - * i.e. if this name only contains characters, numbers, and _. - * - * This method is intended to have the same behavior of - * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName. - */ - def validateName(name: String): Boolean = { - val tpat = Pattern.compile("[\\w_]+") - val matcher = tpat.matcher(name) - - matcher.matches() - } - def createDataSourceTable( sparkSession: SparkSession, tableIdent: TableIdentifier, @@ -396,6 +340,7 @@ object CreateDataSourceTableUtils extends Logging { identifier = tableIdent, tableType = tableType, schema = new StructType, + provider = Some(provider), storage = CatalogStorageFormat( locationUri = None, inputFormat = None, @@ -425,6 +370,7 @@ object CreateDataSourceTableUtils extends Logging { properties = options ), schema = relation.schema, + provider = Some(provider), properties = tableProperties.toMap, viewText = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 18369b51b9..1b1e2123b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -19,50 +19,25 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ +case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan]) + extends LogicalPlan with Command { + assert(tableDesc.provider.isDefined, "The table to be created must have a provider.") -/** - * Used to represent the operation of create table using a data source. - * - * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - */ -case class CreateTableUsing( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - provider: String, - temporary: Boolean, - options: Map[String, String], - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - allowExisting: Boolean, - managedIfNoPath: Boolean) extends LogicalPlan with logical.Command { - - override def output: Seq[Attribute] = Seq.empty - override def children: Seq[LogicalPlan] = Seq.empty -} + if (query.isEmpty) { + assert( + mode == SaveMode.ErrorIfExists || mode == SaveMode.Ignore, + "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.") + } -/** - * A node used to support CTAS statements and saveAsTable for the data source API. - * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] because we want the - * analyzer can analyze the logical plan that will be used to populate the table. - * So, [[PreWriteCheck]] can detect cases that are not allowed. - */ -case class CreateTableUsingAsSelect( - tableIdent: TableIdentifier, - provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - mode: SaveMode, - options: Map[String, String], - child: LogicalPlan) extends logical.UnaryNode { override def output: Seq[Attribute] = Seq.empty[Attribute] + + override def children: Seq[LogicalPlan] = query.toSeq } case class CreateTempViewUsing( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 15b9d14bd7..d5b92323d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,17 +17,21 @@ package org.apache.spark.sql.execution.datasources +import java.util.regex.Pattern + import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} +import org.apache.spark.sql.types.{AtomicType, StructType} /** * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]]. @@ -61,6 +65,130 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo } } +/** + * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking. + */ +case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // When we CREATE TABLE without specifying the table schema, we should fail the query if + // bucketing information is specified, as we can't infer bucketing from data files currently, + // and we should ignore the partition columns if it's specified, as we will infer it later, at + // runtime. + case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + if (tableDesc.bucketSpec.isDefined) { + failAnalysis("Cannot specify bucketing information if the table schema is not specified " + + "when creating and will be inferred at runtime") + } + + val partitionColumnNames = tableDesc.partitionColumnNames + if (partitionColumnNames.nonEmpty) { + // The table does not have a specified schema, which means that the schema will be inferred + // at runtime. So, we are not expecting partition columns and we will discover partitions + // at runtime. However, if there are specified partition columns, we simply ignore them and + // provide a warning message. + logWarning( + s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " + + s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " + + "be inferred.") + c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil)) + } else { + c + } + + // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity + // config, and do various checks: + // * column names in table definition can't be duplicated. + // * partition, bucket and sort column names must exist in table definition. + // * partition, bucket and sort column names can't be duplicated. + // * can't use all table columns as partition columns. + // * partition columns' type must be AtomicType. + // * sort columns' type must be orderable. + case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved => + val schema = if (query.isDefined) query.get.schema else tableDesc.schema + checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier) + + val partitionColsChecked = checkPartitionColumns(schema, tableDesc) + val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked) + c.copy(tableDesc = bucketColsChecked) + } + + private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { + val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "partition") + } + checkDuplication(normalizedPartitionCols, "partition") + + if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { + if (tableDesc.provider.get == "hive") { + // When we hit this branch, it means users didn't specify schema for the table to be + // created, as we always include partition columns in table schema for hive serde tables. + // The real schema will be inferred at hive metastore by hive serde, plus the given + // partition columns, so we should not fail the analysis here. + } else { + failAnalysis("Cannot use all columns for partition columns") + } + + } + + schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { + case _: AtomicType => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column") + } + + tableDesc.copy(partitionColumnNames = normalizedPartitionCols) + } + + private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = { + tableDesc.bucketSpec match { + case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => + val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "bucket") + } + checkDuplication(normalizedBucketCols, "bucket") + + val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableDesc.identifier, schema, colName, "sort") + } + checkDuplication(normalizedSortCols, "sort") + + schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach { + case dt if RowOrdering.isOrderable(dt) => // OK + case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column") + } + + tableDesc.copy( + bucketSpec = Some(BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)) + ) + + case None => tableDesc + } + } + + private def checkDuplication(colNames: Seq[String], colType: String): Unit = { + if (colNames.distinct.length != colNames.length) { + val duplicateColumns = colNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => x + } + failAnalysis(s"Found duplicate column(s) in $colType: ${duplicateColumns.mkString(", ")}") + } + } + + private def normalizeColumnName( + tableIdent: TableIdentifier, + schema: StructType, + colName: String, + colType: String): String = { + val tableCols = schema.map(_.name) + tableCols.find(conf.resolver(_, colName)).getOrElse { + failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " + + s"defined table columns are: ${tableCols.mkString(", ")}") + } + } + + private def failAnalysis(msg: String) = throw new AnalysisException(msg) +} + /** * Preprocess the [[InsertIntoTable]] plan. Throws exception if the number of columns mismatch, or * specified partition columns are different from the existing partition columns in the target @@ -152,8 +280,25 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } + // This regex is used to check if the table name and database name is valid for `CreateTable`. + private val validNameFormat = Pattern.compile("[\\w_]+") + def apply(plan: LogicalPlan): Unit = { plan.foreach { + case c @ CreateTable(tableDesc, mode, query) if c.resolved => + // Since we are saving table metadata to metastore, we should make sure the table name + // and database name don't break some common restrictions, e.g. special chars except + // underscore are not allowed. + val tblIdent = tableDesc.identifier + if (!validNameFormat.matcher(tblIdent.table).matches()) { + failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " + + s"metastore. Metastore only accepts table name containing characters, numbers and _.") + } + if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) { + failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " + + s"metastore. Metastore only accepts table name containing characters, numbers and _.") + } + case i @ logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, overwrite, ifNotExists) => @@ -206,22 +351,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case c: CreateTableUsingAsSelect => + case CreateTable(tableDesc, mode, Some(query)) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) { + if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) { // Need to remove SubQuery operator. - EliminateSubqueryAliases(catalog.lookupRelation(c.tableIdent)) match { + EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). case l @ LogicalRelation(dest: BaseRelation, _, _) => // Get all input data source relations of the query. - val srcRelations = c.child.collect { + val srcRelations = query.collect { case LogicalRelation(src: BaseRelation, _, _) => src } if (srcRelations.contains(dest)) { failAnalysis( - s"Cannot overwrite table ${c.tableIdent} that is also being read from.") + s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.") } else { // OK } @@ -232,19 +377,6 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumn( - c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) - - for { - spec <- c.bucketSpec - sortColumnName <- spec.sortColumnNames - sortColumn <- c.child.schema.find(_.name == sortColumnName) - } { - if (!RowOrdering.isOrderable(sortColumn.dataType)) { - failAnalysis(s"Cannot use ${sortColumn.dataType.simpleString} for sorting column.") - } - } - case _ => // OK } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index f8f78723b9..1f87f0e73a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -21,13 +21,13 @@ import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.datasources.CreateTableUsing +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.types.StructType @@ -223,20 +223,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { tableName: String, source: String, options: Map[String, String]): DataFrame = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - val cmd = - CreateTableUsing( - tableIdent, - userSpecifiedSchema = None, - source, - temporary = false, - options = options, - partitionColumns = Array.empty[String], - bucketSpec = None, - allowExisting = false, - managedIfNoPath = false) - sparkSession.sessionState.executePlan(cmd).toRdd - sparkSession.table(tableIdent) + createExternalTable(tableName, source, new StructType, options) } /** @@ -271,19 +258,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { source: String, schema: StructType, options: Map[String, String]): DataFrame = { + if (source == "hive") { + throw new AnalysisException("Cannot create hive serde table with createExternalTable API.") + } + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - val cmd = - CreateTableUsing( - tableIdent, - userSpecifiedSchema = Some(schema), - source, - temporary = false, - options, - partitionColumns = Array.empty[String], - bucketSpec = None, - allowExisting = false, - managedIfNoPath = false) - sparkSession.sessionState.executePlan(cmd).toRdd + val tableDesc = CatalogTable( + identifier = tableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(properties = options), + schema = schema, + provider = Some(source) + ) + val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) + sparkSession.sessionState.executePlan(plan).toRdd sparkSession.table(tableIdent) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index a228566b6b..052bce0923 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand -import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreprocessTableInsertion, ResolveDataSource} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager @@ -111,6 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = + PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: new FindDataSourceTable(sparkSession) :: DataSourceAnalysis(conf) :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 999afc9751..044fa5fb9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -20,13 +20,12 @@ package org.apache.spark.sql.execution.command import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource} -import org.apache.spark.sql.catalyst.catalog.FunctionResourceType +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.SparkSqlParser -import org.apache.spark.sql.execution.datasources.CreateTableUsing +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -243,12 +242,12 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab STORED AS $s" - val ct = parseAs[CreateTableCommand](query) + val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } } @@ -259,14 +258,14 @@ class DDLCommandSuite extends PlanTest { val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat" // No conflicting serdes here, OK - val parsed1 = parseAs[CreateTableCommand](query1) - assert(parsed1.table.storage.serde == Some("anything")) - assert(parsed1.table.storage.inputFormat == Some("inputfmt")) - assert(parsed1.table.storage.outputFormat == Some("outputfmt")) - val parsed2 = parseAs[CreateTableCommand](query2) - assert(parsed2.table.storage.serde.isEmpty) - assert(parsed2.table.storage.inputFormat == Some("inputfmt")) - assert(parsed2.table.storage.outputFormat == Some("outputfmt")) + val parsed1 = parseAs[CreateTable](query1) + assert(parsed1.tableDesc.storage.serde == Some("anything")) + assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) + assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) + val parsed2 = parseAs[CreateTable](query2) + assert(parsed2.tableDesc.storage.serde.isEmpty) + assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) + assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt")) } test("create table - row format serde and generic file format") { @@ -276,12 +275,12 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" if (supportedSources.contains(s)) { - val ct = parseAs[CreateTableCommand](query) + val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == Some("anything")) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.tableDesc.storage.serde == Some("anything")) + assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format serde", "incompatible", s)) } @@ -295,12 +294,12 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" if (supportedSources.contains(s)) { - val ct = parseAs[CreateTableCommand](query) + val ct = parseAs[CreateTable](query) val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) assert(hiveSerde.isDefined) - assert(ct.table.storage.serde == hiveSerde.get.serde) - assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat) - assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat) + assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) + assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) + assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat) } else { assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s)) } @@ -312,9 +311,9 @@ class DDLCommandSuite extends PlanTest { sql = "CREATE EXTERNAL TABLE my_tab", containsThesePhrases = Seq("create external table", "location")) val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'" - val ct = parseAs[CreateTableCommand](query) - assert(ct.table.tableType == CatalogTableType.EXTERNAL) - assert(ct.table.storage.locationUri == Some("/something/anything")) + val ct = parseAs[CreateTable](query) + assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) + assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } test("create table - property values must be set") { @@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest { test("create table - location implies external") { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" - val ct = parseAs[CreateTableCommand](query) - assert(ct.table.tableType == CatalogTableType.EXTERNAL) - assert(ct.table.storage.locationUri == Some("/something/anything")) - } - - test("create table - column repeated in partitioning columns") { - val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains( - "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]")) - } - - test("create table - duplicate column names in the table definition") { - val query = "CREATE TABLE default.tab1 (key INT, key STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " + - "table definition of `default`.`tab1`: [\"key\"]")) + val ct = parseAs[CreateTable](query) + assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) + assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) } test("create table using - with partitioned by") { val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + "USING parquet PARTITIONED BY (a)" - val expected = CreateTableUsing( - TableIdentifier("my_tab"), - Some(new StructType() + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() .add("a", IntegerType, nullable = true, "test") - .add("b", StringType)), - "parquet", - false, - Map.empty, - null, - None, - false, - true) + .add("b", StringType), + provider = Some("parquet"), + partitionColumnNames = Seq("a") + ) parser.parsePlan(query) match { - case ct: CreateTableUsing => - // We can't compare array in `CreateTableUsing` directly, so here we compare - // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison. - assert(Seq("a") == ct.partitionColumns.toSeq) - comparePlans(ct.copy(partitionColumns = null), expected) + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + s"got ${other.getClass.getName}: $query") @@ -379,23 +360,19 @@ class DDLCommandSuite extends PlanTest { test("create table using - with bucket") { val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" - val expected = CreateTableUsing( - TableIdentifier("my_tab"), - Some(new StructType().add("a", IntegerType).add("b", StringType)), - "parquet", - false, - Map.empty, - null, - Some(BucketSpec(5, Seq("a"), Seq("b"))), - false, - true) + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("my_tab"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", IntegerType).add("b", StringType), + provider = Some("parquet"), + bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b"))) + ) parser.parsePlan(query) match { - case ct: CreateTableUsing => - // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before - // plan comparison. - assert(ct.partitionColumns.isEmpty) - comparePlans(ct.copy(partitionColumns = null), expected) + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + s"got ${other.getClass.getName}: $query") @@ -907,22 +884,20 @@ class DDLCommandSuite extends PlanTest { |CREATE TABLE table_name USING json |OPTIONS (a 1, b 0.1, c TRUE) """.stripMargin - val expected = CreateTableUsing( - TableIdentifier("table_name"), - None, - "json", - false, - Map("a" -> "1", "b" -> "0.1", "c" -> "true"), - null, - None, - false, - true) + + val expectedTableDesc = CatalogTable( + identifier = TableIdentifier("table_name"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy( + properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true") + ), + schema = new StructType, + provider = Some("json") + ) parser.parsePlan(sql) match { - case ct: CreateTableUsing => - // We can't compare array in `CreateTableUsing` directly, so here we explicitly - // set partitionColumns to `null` and then compare it. - comparePlans(ct.copy(partitionColumns = null), expected) + case CreateTable(tableDesc, _, None) => + assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + s"got ${other.getClass.getName}: $sql") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 564fc73ee7..ca9b210125 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -94,6 +93,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .add("col2", "string") .add("a", "int") .add("b", "int"), + provider = Some("parquet"), partitionColumnNames = Seq("a", "b"), createTime = 0L) } @@ -359,6 +359,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("create table - duplicate column names in the table definition") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int, a string) USING json") + } + assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") + } + + test("create table - partition column names not in table definition") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)") + } + assert(e.message == "partition column c is not defined in table `tbl`, " + + "defined table columns are: a, b") + } + + test("create table - bucket column names not in table definition") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS") + } + assert(e.message == "bucket column c is not defined in table `tbl`, " + + "defined table columns are: a, b") + } + + test("create table - column repeated in partition columns") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)") + } + assert(e.message == "Found duplicate column(s) in partition: a") + } + + test("create table - column repeated in bucket columns") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS") + } + assert(e.message == "Found duplicate column(s) in bucket: a") + } + test("Describe Table with Corrupted Schema") { import testImplicits._ @@ -1469,7 +1506,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTable("jsonTable") { (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath) - val e = intercept[ParseException] { + val e = intercept[AnalysisException] { sql( s""" |CREATE TABLE jsonTable @@ -1479,9 +1516,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |) |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS """.stripMargin) - }.getMessage - assert(e.contains( - "Expected explicit specification of table schema when using CLUSTERED BY clause")) + } + assert(e.message == "Cannot specify bucketing information if the table schema is not " + + "specified when creating and will be inferred at runtime") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index db970785a7..c7c1acda25 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,15 +23,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -436,23 +434,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p - case p: LogicalPlan if p.resolved => p - case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => - val desc = if (table.storage.serde.isEmpty) { + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => + val newTableDesc = if (tableDesc.storage.serde.isEmpty) { // add default serde - table.withNewStorage( + tableDesc.withNewStorage( serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) } else { - table + tableDesc } - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) + val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc) + + // Currently we will never hit this branch, as SQL string API can only use `Ignore` or + // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde + // tables yet. + if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + throw new AnalysisException("" + + "CTAS for hive serde tables does not support append or overwrite semantics.") + } execution.CreateHiveTableAsSelectCommand( - desc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - child, - allowExisting) + newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), + query, + mode == SaveMode.Ignore) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 8773993d36..e01c053ab5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -65,6 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) catalog.ParquetConversions :: catalog.OrcConversions :: catalog.CreateTables :: + PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index e0c07db3b0..69a6884c7a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.types.StructType @@ -36,8 +37,7 @@ class HiveDDLCommandSuite extends PlanTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { - case c: CreateTableCommand => (c.table, c.ifNotExists) - case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting) + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d15e11a7ff..e078b58542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -141,6 +141,13 @@ class HiveDDLSuite } } + test("create table: partition column names exist in table definition") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)") + } + assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => -- cgit v1.2.3