From 8e740ae44d55570a3e7b6eae1f0239ac1319b986 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 1 Sep 2016 16:45:22 +0800 Subject: [SPARK-17257][SQL] the physical plan of CREATE TABLE or CTAS should take CatalogTable ## What changes were proposed in this pull request? This is kind of a follow-up of https://github.com/apache/spark/pull/14482 . As we put `CatalogTable` in the logical plan directly, it makes sense to let physical plans take `CatalogTable` directly, instead of extracting some fields of `CatalogTable` in planner and then construct a new `CatalogTable` in physical plan. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #14823 from cloud-fan/create-table. --- .../org/apache/spark/sql/DataFrameWriter.scala | 10 +- .../spark/sql/execution/SparkSqlParser.scala | 15 ++- .../spark/sql/execution/SparkStrategies.scala | 16 +-- .../execution/command/createDataSourceTables.scala | 135 +++++++++------------ .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 6 +- 5 files changed, 78 insertions(+), 104 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a9049a60f2..c05c7a6551 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.StructType @@ -368,9 +368,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => + val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + val tableDesc = CatalogTable( identifier = tableIdent, - tableType = CatalogTableType.EXTERNAL, + tableType = tableType, storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap), schema = new StructType, provider = Some(source), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 656494d97d..8fc1a8595a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -325,14 +325,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) + // TODO: this may be wrong for non file-based data source like JDBC, which should be external + // even there is no `path` in options. We should consider allow the EXTERNAL keyword. + val tableType = if (new CaseInsensitiveMap(options).contains("path")) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + val tableDesc = CatalogTable( identifier = table, - // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the - // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a - // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate - // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and - // make it take `CatalogTable` directly. - tableType = CatalogTableType.MANAGED, + tableType = tableType, storage = CatalogStorageFormat.empty.copy(properties = options), schema = schema.getOrElse(new StructType), provider = Some(provider), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4aaf454285..b4899ad688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -424,15 +424,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case CreateTable(tableDesc, mode, None) => val cmd = - CreateDataSourceTableCommand( - tableDesc.identifier, - if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None, - tableDesc.provider.get, - tableDesc.storage.properties, - tableDesc.partitionColumnNames.toArray, - tableDesc.bucketSpec, - ignoreIfExists = mode == SaveMode.Ignore, - managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED) + CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule @@ -441,12 +433,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" => val cmd = CreateDataSourceTableAsSelectCommand( - tableDesc.identifier, - tableDesc.provider.get, - tableDesc.partitionColumnNames.toArray, - tableDesc.bucketSpec, + tableDesc, mode, - tableDesc.storage.properties, query) ExecutedCommandExec(cmd) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7400a0e7bb..da3f6c600a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -40,71 +40,56 @@ import org.apache.spark.sql.types._ * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) * }}} */ -case class CreateDataSourceTableCommand( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - provider: String, - options: Map[String, String], - userSpecifiedPartitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - ignoreIfExists: Boolean, - managedIfNoPath: Boolean) +case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val tableName = tableIdent.unquotedString - val sessionState = sparkSession.sessionState + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) - if (sessionState.catalog.tableExists(tableIdent)) { + val sessionState = sparkSession.sessionState + if (sessionState.catalog.tableExists(table.identifier)) { if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table $tableName already exists.") + throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") } } - var isExternal = true - val optionsWithPath = - if (!new CaseInsensitiveMap(options).contains("path") && managedIfNoPath) { - isExternal = false - options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent)) - } else { - options - } + val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { + table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.properties + } - // Create the relation to validate the arguments before writing the metadata to the metastore. + // Create the relation to validate the arguments before writing the metadata to the metastore, + // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. val dataSource: BaseRelation = DataSource( sparkSession = sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, + userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), + className = table.provider.get, + bucketSpec = table.bucketSpec, options = optionsWithPath).resolveRelation(checkPathExist = false) - val partitionColumns = if (userSpecifiedSchema.nonEmpty) { - userSpecifiedPartitionColumns + val partitionColumnNames = if (table.schema.nonEmpty) { + table.partitionColumnNames } else { // This is guaranteed in `PreprocessDDL`. - assert(userSpecifiedPartitionColumns.isEmpty) + assert(table.partitionColumnNames.isEmpty) dataSource match { - case r: HadoopFsRelation => r.partitionSchema.fieldNames - case _ => Array.empty[String] + case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq + case _ => Nil } } - val table = CatalogTable( - identifier = tableIdent, - tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), + val newTable = table.copy( + storage = table.storage.copy(properties = optionsWithPath), schema = dataSource.schema, - provider = Some(provider), - partitionColumnNames = partitionColumns, - bucketSpec = bucketSpec - ) - + partitionColumnNames = partitionColumnNames) // We will return Nil or throw exception at the beginning if the table already exists, so when // we reach here, the table should not exist and we should set `ignoreIfExists` to false. - sessionState.catalog.createTable(table, ignoreIfExists = false) + sessionState.catalog.createTable(newTable, ignoreIfExists = false) Seq.empty[Row] } } @@ -112,7 +97,7 @@ case class CreateDataSourceTableCommand( /** * A command used to create a data source table using the result of a query. * - * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for + * Note: This is different from `CreateHiveTableAsSelectCommand`. Please check the syntax for * difference. This is not intended for temporary tables. * * The syntax of using this command in SQL is: @@ -123,32 +108,31 @@ case class CreateDataSourceTableCommand( * }}} */ case class CreateDataSourceTableAsSelectCommand( - tableIdent: TableIdentifier, - provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], + table: CatalogTable, mode: SaveMode, - options: Map[String, String], query: LogicalPlan) extends RunnableCommand { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { - val tableName = tableIdent.unquotedString + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + assert(table.schema.isEmpty) + + val tableName = table.identifier.unquotedString + val provider = table.provider.get val sessionState = sparkSession.sessionState - var createMetastoreTable = false - var isExternal = true - val optionsWithPath = - if (!new CaseInsensitiveMap(options).contains("path")) { - isExternal = false - options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent)) - } else { - options - } + val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) { + table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.properties + } + + var createMetastoreTable = false var existingSchema = Option.empty[StructType] - if (sparkSession.sessionState.catalog.tableExists(tableIdent)) { + if (sparkSession.sessionState.catalog.tableExists(table.identifier)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -165,21 +149,21 @@ case class CreateDataSourceTableAsSelectCommand( val dataSource = DataSource( sparkSession = sparkSession, userSpecifiedSchema = Some(query.schema.asNullable), - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, className = provider, options = optionsWithPath) // TODO: Check that options from the resolved relation match the relation that we are // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdent)) match { + sessionState.catalog.lookupRelation(table.identifier)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass => throw new AnalysisException( - s"The file format of the existing table $tableIdent is " + + s"The file format of the existing table $tableName is " + s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + s"format `$provider`") case _ => @@ -216,36 +200,29 @@ case class CreateDataSourceTableAsSelectCommand( val dataSource = DataSource( sparkSession, className = provider, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, options = optionsWithPath) val result = try { dataSource.write(mode, df) } catch { case ex: AnalysisException => - logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + logError(s"Failed to write to table $tableName in $mode mode", ex) throw ex } if (createMetastoreTable) { - // We will use the schema of resolved.relation as the schema of the table (instead of - // the schema of df). It is important since the nullability may be changed by the relation - // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - val schema = result.schema - val table = CatalogTable( - identifier = tableIdent, - tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), - schema = schema, - provider = Some(provider), - partitionColumnNames = partitionColumns, - bucketSpec = bucketSpec - ) - sessionState.catalog.createTable(table, ignoreIfExists = false) + val newTable = table.copy( + storage = table.storage.copy(properties = optionsWithPath), + // We will use the schema of resolved.relation as the schema of the table (instead of + // the schema of df). It is important since the nullability may be changed by the relation + // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). + schema = result.schema) + sessionState.catalog.createTable(newTable, ignoreIfExists = false) } // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(tableIdent) + sessionState.catalog.refreshTable(table.identifier) Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d77bb5cf95..7a71475a2f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -906,7 +906,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val e = intercept[AnalysisException] { createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") } - assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " + + assert(e.getMessage.contains("The file format of the existing table appendOrcToParquet " + "is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " + "It doesn't match the specified format `orc`")) } @@ -917,7 +917,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("parquet") .saveAsTable("appendParquetToJson") } - assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " + + assert(e.getMessage.contains("The file format of the existing table appendParquetToJson " + "is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `parquet`")) } @@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("text") .saveAsTable("appendTextToJson") } - assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " + + assert(e.getMessage.contains("The file format of the existing table appendTextToJson is " + "`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `text`")) } -- cgit v1.2.3