From 1672149c2644b5670b4b9a4086a4456fb8279a55 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 23 Apr 2016 22:29:31 -0700 Subject: [SPARK-14879][SQL] Move CreateMetastoreDataSource and CreateMetastoreDataSourceAsSelect to sql/core ## What changes were proposed in this pull request? CreateMetastoreDataSource and CreateMetastoreDataSourceAsSelect are not Hive-specific. So, this PR moves them from sql/hive to sql/core. Also, I am adding `Command` suffix to these two classes. ## How was this patch tested? Existing tests. Author: Yin Huai Closes #12645 from yhuai/moveCreateDataSource. --- .../sql/catalyst/catalog/SessionCatalog.scala | 9 + .../spark/sql/execution/SparkStrategies.scala | 29 +- .../execution/command/createDataSourceTables.scala | 452 +++++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 192 --------- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 12 - .../apache/spark/sql/hive/HiveSessionState.scala | 1 - .../org/apache/spark/sql/hive/HiveStrategies.scala | 25 +- .../apache/spark/sql/hive/execution/commands.scala | 212 ---------- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 16 +- 9 files changed, 497 insertions(+), 451 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 152bd499a0..67b1752ee8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -21,6 +21,8 @@ import java.io.File import scala.collection.mutable +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} @@ -221,6 +223,13 @@ class SessionCatalog( inheritTableSpecs, isSkewedStoreAsSubdir) } + def defaultTablePath(tableIdent: TableIdentifier): String = { + val dbName = tableIdent.database.getOrElse(currentDb) + val dbLocation = getDatabaseMetadata(dbName).locationUri + + new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString + } + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3ce5f28bf3..3c10504fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ @@ -421,10 +421,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ExecutedCommandExec( CreateTempTableUsing( tableIdent, userSpecifiedSchema, provider, opts)) :: Nil + case c: CreateTableUsing if !c.temporary => - sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + val cmd = + CreateDataSourceTableCommand( + c.tableIdent, + c.userSpecifiedSchema, + c.provider, + c.options, + c.allowExisting, + c.managedIfNoPath) + ExecutedCommandExec(cmd) :: Nil + case c: CreateTableUsing if c.temporary && c.allowExisting => - sys.error("allowExisting should be set to false when creating a temporary table.") + throw new AnalysisException( + "allowExisting should be set to false when creating a temporary table.") case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty => sys.error("Cannot create temporary partitioned table.") @@ -433,8 +444,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val cmd = CreateTempTableUsingAsSelect( c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child) ExecutedCommandExec(cmd) :: Nil + case c: CreateTableUsingAsSelect if !c.temporary => - sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + val cmd = + CreateDataSourceTableAsSelectCommand( + c.tableIdent, + c.provider, + c.partitionColumns, + c.bucketSpec, + c.mode, + c.options, + c.child) + ExecutedCommandExec(cmd) :: Nil case logical.ShowFunctions(db, pattern) => ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala new file mode 100644 index 0000000000..0ef1d1d688 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.util.regex.Pattern + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.types._ + +/** + * A command used to create a data source table. + * + * Note: This is different from [[CreateTable]]. Please check the syntax for difference. + * This is not intended for temporary tables. + * + * The syntax of using this command in SQL is: + * {{{ + * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1 data_type [COMMENT col_comment], ...)] + * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) + * }}} + */ +case class CreateDataSourceTableCommand( + tableIdent: TableIdentifier, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String], + ignoreIfExists: Boolean, + managedIfNoPath: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Since we are saving metadata to metastore, we need to check if metastore supports + // the table name and database name we have for this query. MetaStoreUtils.validateName + // is the method used by Hive to check if a table name or a database name is valid for + // the metastore. + if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) { + throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + + s"metastore. Metastore only accepts table name containing characters, numbers and _.") + } + if (tableIdent.database.isDefined && + !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) { + throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + + s"for metastore. Metastore only accepts database name containing " + + s"characters, numbers and _.") + } + + val tableName = tableIdent.unquotedString + val sessionState = sqlContext.sessionState + + if (sessionState.catalog.tableExists(tableIdent)) { + if (ignoreIfExists) { + return Seq.empty[Row] + } else { + throw new AnalysisException(s"Table $tableName already exists.") + } + } + + var isExternal = true + val optionsWithPath = + if (!options.contains("path") && managedIfNoPath) { + isExternal = false + options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent)) + } else { + options + } + + // Create the relation to validate the arguments before writing the metadata to the metastore. + DataSource( + sqlContext = sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + className = provider, + bucketSpec = None, + options = optionsWithPath).resolveRelation() + + CreateDataSourceTableUtils.createDataSourceTable( + sqlContext = sqlContext, + tableIdent = tableIdent, + userSpecifiedSchema = userSpecifiedSchema, + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = provider, + options = optionsWithPath, + isExternal = isExternal) + + Seq.empty[Row] + } +} + +/** + * A command used to create a data source table using the result of a query. + * + * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference. + * This is not intended for temporary tables. + * + * The syntax of using this command in SQL is: + * {{{ + * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name + * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) + * AS SELECT ... + * }}} + */ +case class CreateDataSourceTableAsSelectCommand( + tableIdent: TableIdentifier, + provider: String, + partitionColumns: Array[String], + bucketSpec: Option[BucketSpec], + mode: SaveMode, + options: Map[String, String], + query: LogicalPlan) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Since we are saving metadata to metastore, we need to check if metastore supports + // the table name and database name we have for this query. MetaStoreUtils.validateName + // is the method used by Hive to check if a table name or a database name is valid for + // the metastore. + if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) { + throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + + s"metastore. Metastore only accepts table name containing characters, numbers and _.") + } + if (tableIdent.database.isDefined && + !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) { + throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + + s"for metastore. Metastore only accepts database name containing " + + s"characters, numbers and _.") + } + + val tableName = tableIdent.unquotedString + val sessionState = sqlContext.sessionState + var createMetastoreTable = false + var isExternal = true + val optionsWithPath = + if (!options.contains("path")) { + isExternal = false + options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent)) + } else { + options + } + + var existingSchema = None: Option[StructType] + if (sqlContext.sessionState.catalog.tableExists(tableIdent)) { + // Check if we need to throw an exception or just return. + mode match { + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") + case SaveMode.Ignore => + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty[Row] + case SaveMode.Append => + // Check if the specified data source match the data source of the existing table. + val dataSource = DataSource( + sqlContext = sqlContext, + userSpecifiedSchema = Some(query.schema.asNullable), + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + className = provider, + options = optionsWithPath) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + + EliminateSubqueryAliases( + sessionState.catalog.lookupRelation(tableIdent)) match { + case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + existingSchema = Some(l.schema) + case o => + throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") + } + case SaveMode.Overwrite => + sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") + // Need to create the table again. + createMetastoreTable = true + } + } else { + // The table does not exist. We need to create it in metastore. + createMetastoreTable = true + } + + val data = Dataset.ofRows(sqlContext, query) + val df = existingSchema match { + // If we are inserting into an existing table, just use the existing schema. + case Some(s) => data.selectExpr(s.fieldNames: _*) + case None => data + } + + // Create the relation based on the data of df. + val dataSource = DataSource( + sqlContext, + className = provider, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + options = optionsWithPath) + + val result = dataSource.write(mode, df) + + if (createMetastoreTable) { + // We will use the schema of resolved.relation as the schema of the table (instead of + // the schema of df). It is important since the nullability may be changed by the relation + // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). + CreateDataSourceTableUtils.createDataSourceTable( + sqlContext = sqlContext, + tableIdent = tableIdent, + userSpecifiedSchema = Some(result.schema), + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + provider = provider, + options = optionsWithPath, + isExternal = isExternal) + } + + // Refresh the cache of the table in the catalog. + sessionState.catalog.refreshTable(tableIdent) + Seq.empty[Row] + } +} + +object CreateDataSourceTableUtils extends Logging { + /** + * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), + * i.e. if this name only contains characters, numbers, and _. + * + * This method is intended to have the same behavior of + * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName. + */ + def validateName(name: String): Boolean = { + val tpat = Pattern.compile("[\\w_]+") + val matcher = tpat.matcher(name) + + matcher.matches() + } + + def createDataSourceTable( + sqlContext: SQLContext, + tableIdent: TableIdentifier, + userSpecifiedSchema: Option[StructType], + partitionColumns: Array[String], + bucketSpec: Option[BucketSpec], + provider: String, + options: Map[String, String], + isExternal: Boolean): Unit = { + val tableProperties = new mutable.HashMap[String, String] + tableProperties.put("spark.sql.sources.provider", provider) + + // Saves optional user specified schema. Serialized JSON schema string may be too long to be + // stored into a single metastore SerDe property. In this case, we split the JSON string and + // store each part as a separate SerDe property. + userSpecifiedSchema.foreach { schema => + val threshold = sqlContext.sessionState.conf.schemaStringLengthThreshold + val schemaJsonString = schema.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => + tableProperties.put(s"spark.sql.sources.schema.part.$index", part) + } + } + + if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) { + tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString) + partitionColumns.zipWithIndex.foreach { case (partCol, index) => + tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol) + } + } + + if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) { + val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get + + tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString) + tableProperties.put("spark.sql.sources.schema.numBucketCols", + bucketColumnNames.length.toString) + bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => + tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol) + } + + if (sortColumnNames.nonEmpty) { + tableProperties.put("spark.sql.sources.schema.numSortCols", + sortColumnNames.length.toString) + sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => + tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol) + } + } + } + + if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simply ignore them and provide a warning message. + logWarning( + s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + + s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") + } + + val tableType = if (isExternal) { + tableProperties.put("EXTERNAL", "TRUE") + CatalogTableType.EXTERNAL_TABLE + } else { + tableProperties.put("EXTERNAL", "FALSE") + CatalogTableType.MANAGED_TABLE + } + + val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sqlContext.sessionState.conf) + val dataSource = + DataSource( + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + className = provider, + options = options) + + def newSparkSQLSpecificMetastoreTable(): CatalogTable = { + CatalogTable( + identifier = tableIdent, + tableType = tableType, + schema = Nil, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = options + ), + properties = tableProperties.toMap) + } + + def newHiveCompatibleMetastoreTable( + relation: HadoopFsRelation, + serde: HiveSerDe): CatalogTable = { + assert(partitionColumns.isEmpty) + assert(relation.partitionSchema.isEmpty) + + CatalogTable( + identifier = tableIdent, + tableType = tableType, + storage = CatalogStorageFormat( + locationUri = Some(relation.location.paths.map(_.toUri.toString).head), + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde, + serdeProperties = options + ), + schema = relation.schema.map { f => + CatalogColumn(f.name, f.dataType.catalogString) + }, + properties = tableProperties.toMap, + viewText = None) + } + + // TODO: Support persisting partitioned data source relations in Hive compatible format + val qualifiedTableName = tableIdent.quotedString + val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean + val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match { + case _ if skipHiveMetadata => + val message = + s"Persisting partitioned data source relation $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + + case (Some(serde), relation: HadoopFsRelation) + if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty => + val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) + val message = + s"Persisting data source relation $qualifiedTableName with a single input path " + + s"into Hive metastore in Hive compatible format. Input path: " + + s"${relation.location.paths.head}." + (Some(hiveTable), message) + + case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty => + val message = + s"Persisting partitioned data source relation $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + + "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") + (None, message) + + case (Some(serde), relation: HadoopFsRelation) => + val message = + s"Persisting data source relation $qualifiedTableName with multiple input paths into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + + s"Input paths: " + relation.location.paths.mkString("\n", "\n", "") + (None, message) + + case (Some(serde), _) => + val message = + s"Data source relation $qualifiedTableName is not a " + + s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " + + "in Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + + case _ => + val message = + s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + s"Persisting data source relation $qualifiedTableName into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive." + (None, message) + } + + (hiveCompatibleTable, logMessage) match { + case (Some(table), message) => + // We first try to save the metadata of the table in a Hive compatible way. + // If Hive throws an error, we fall back to save its metadata in the Spark SQL + // specific way. + try { + logInfo(message) + sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false) + } catch { + case NonFatal(e) => + val warningMessage = + s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " + + s"it into Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + val table = newSparkSQLSpecificMetastoreTable() + sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false) + } + + case (None, message) => + logWarning(message) + val table = newSparkSQLSpecificMetastoreTable() + sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9353e9ccd2..13f29e08fb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -156,198 +156,6 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } - def createDataSourceTable( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - - val tableProperties = new mutable.HashMap[String, String] - tableProperties.put("spark.sql.sources.provider", provider) - - // Saves optional user specified schema. Serialized JSON schema string may be too long to be - // stored into a single metastore SerDe property. In this case, we split the JSON string and - // store each part as a separate SerDe property. - userSpecifiedSchema.foreach { schema => - val threshold = conf.schemaStringLengthThreshold - val schemaJsonString = schema.json - // Split the JSON string. - val parts = schemaJsonString.grouped(threshold).toSeq - tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString) - parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"spark.sql.sources.schema.part.$index", part) - } - } - - if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) { - tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString) - partitionColumns.zipWithIndex.foreach { case (partCol, index) => - tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol) - } - } - - if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get - - tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString) - tableProperties.put("spark.sql.sources.schema.numBucketCols", - bucketColumnNames.length.toString) - bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => - tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol) - } - - if (sortColumnNames.nonEmpty) { - tableProperties.put("spark.sql.sources.schema.numSortCols", - sortColumnNames.length.toString) - sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => - tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol) - } - } - } - - if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + - s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") - } - - val tableType = if (isExternal) { - tableProperties.put("EXTERNAL", "TRUE") - CatalogTableType.EXTERNAL_TABLE - } else { - tableProperties.put("EXTERNAL", "FALSE") - CatalogTableType.MANAGED_TABLE - } - - val maybeSerDe = HiveSerDe.sourceToSerDe(provider, conf) - val dataSource = - DataSource( - hive, - userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = provider, - options = options) - - def newSparkSQLSpecificMetastoreTable(): CatalogTable = { - CatalogTable( - identifier = TableIdentifier(tblName, Option(dbName)), - tableType = tableType, - schema = Nil, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - serdeProperties = options - ), - properties = tableProperties.toMap) - } - - def newHiveCompatibleMetastoreTable( - relation: HadoopFsRelation, - serde: HiveSerDe): CatalogTable = { - assert(partitionColumns.isEmpty) - assert(relation.partitionSchema.isEmpty) - - CatalogTable( - identifier = TableIdentifier(tblName, Option(dbName)), - tableType = tableType, - storage = CatalogStorageFormat( - locationUri = Some(relation.location.paths.map(_.toUri.toString).head), - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde, - serdeProperties = options - ), - schema = relation.schema.map { f => - CatalogColumn(f.name, f.dataType.catalogString) - }, - properties = tableProperties.toMap, - viewText = None) // TODO: We need to place the SQL string here - } - - // TODO: Support persisting partitioned data source relations in Hive compatible format - val qualifiedTableName = tableIdent.quotedString - val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean - val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match { - case _ if skipHiveMetadata => - val message = - s"Persisting partitioned data source relation $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - - case (Some(serde), relation: HadoopFsRelation) - if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty => - val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) - val message = - s"Persisting data source relation $qualifiedTableName with a single input path " + - s"into Hive metastore in Hive compatible format. Input path: " + - s"${relation.location.paths.head}." - (Some(hiveTable), message) - - case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty => - val message = - s"Persisting partitioned data source relation $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + - "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") - (None, message) - - case (Some(serde), relation: HadoopFsRelation) => - val message = - s"Persisting data source relation $qualifiedTableName with multiple input paths into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + - s"Input paths: " + relation.location.paths.mkString("\n", "\n", "") - (None, message) - - case (Some(serde), _) => - val message = - s"Data source relation $qualifiedTableName is not a " + - s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " + - "in Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - - case _ => - val message = - s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + - s"Persisting data source relation $qualifiedTableName into Hive metastore in " + - s"Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - } - - (hiveCompatibleTable, logMessage) match { - case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatible way. - // If Hive throws an error, we fall back to save its metadata in the Spark SQL - // specific way. - try { - logInfo(message) - client.createTable(table, ignoreIfExists = false) - } catch { - case throwable: Throwable => - val warningMessage = - s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " + - s"it into Hive metastore in Spark SQL specific format." - logWarning(warningMessage, throwable) - val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable() - client.createTable(sparkSqlSpecificTable, ignoreIfExists = false) - } - - case (None, message) => - logWarning(message) - val hiveTable = newSparkSQLSpecificMetastoreTable() - client.createTable(hiveTable, ignoreIfExists = false) - } - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4f9513389c..3e718826fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -103,18 +103,6 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.cachedDataSourceTables.invalidateAll() } - def createDataSourceTable( - name: TableIdentifier, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - metastoreCatalog.createDataSourceTable( - name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal) - } - def hiveDefaultTableFilePath(name: TableIdentifier): String = { metastoreCatalog.hiveDefaultTableFilePath(name) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index a22e19207e..636bfdebf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -110,7 +110,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) experimentalMethods.extraStrategies ++ Seq( FileSourceStrategy, DataSourceStrategy, - HiveDDLStrategy, DDLStrategy, SpecialLimits, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2d36ddafe6..2bea32b144 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, CreateTempTableUsingAsSelect} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -83,27 +83,4 @@ private[hive] trait HiveStrategies { Nil } } - - object HiveDDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing( - tableIdent, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => - val cmd = - CreateMetastoreDataSource( - tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath) - ExecutedCommandExec(cmd) :: Nil - - case c: CreateTableUsingAsSelect if c.temporary => - val cmd = CreateTempTableUsingAsSelect( - c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child) - ExecutedCommandExec(cmd) :: Nil - - case c: CreateTableUsingAsSelect => - val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns, - c.bucketSpec, c.mode, c.options, c.child) - ExecutedCommandExec(cmd) :: Nil - - case _ => Nil - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala deleted file mode 100644 index 6899f46eec..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.hadoop.hive.metastore.MetaStoreUtils - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.hive.HiveSessionState -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ - - -private[hive] -case class CreateMetastoreDataSource( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - provider: String, - options: Map[String, String], - allowExisting: Boolean, - managedIfNoPath: Boolean) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Since we are saving metadata to metastore, we need to check if metastore supports - // the table name and database name we have for this query. MetaStoreUtils.validateName - // is the method used by Hive to check if a table name or a database name is valid for - // the metastore. - if (!MetaStoreUtils.validateName(tableIdent.table)) { - throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) { - throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + - s"for metastore. Metastore only accepts database name containing " + - s"characters, numbers and _.") - } - - val tableName = tableIdent.unquotedString - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - - if (sessionState.catalog.tableExists(tableIdent)) { - if (allowExisting) { - return Seq.empty[Row] - } else { - throw new AnalysisException(s"Table $tableName already exists.") - } - } - - var isExternal = true - val optionsWithPath = - if (!options.contains("path") && managedIfNoPath) { - isExternal = false - options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) - } else { - options - } - - // Create the relation to validate the arguments before writing the metadata to the metastore. - DataSource( - sqlContext = sqlContext, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - bucketSpec = None, - options = optionsWithPath).resolveRelation() - - sessionState.catalog.createDataSourceTable( - tableIdent, - userSpecifiedSchema, - Array.empty[String], - bucketSpec = None, - provider, - optionsWithPath, - isExternal) - - Seq.empty[Row] - } -} - -private[hive] -case class CreateMetastoreDataSourceAsSelect( - tableIdent: TableIdentifier, - provider: String, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - mode: SaveMode, - options: Map[String, String], - query: LogicalPlan) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Since we are saving metadata to metastore, we need to check if metastore supports - // the table name and database name we have for this query. MetaStoreUtils.validateName - // is the method used by Hive to check if a table name or a database name is valid for - // the metastore. - if (!MetaStoreUtils.validateName(tableIdent.table)) { - throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) { - throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " + - s"for metastore. Metastore only accepts database name containing " + - s"characters, numbers and _.") - } - - val tableName = tableIdent.unquotedString - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - var createMetastoreTable = false - var isExternal = true - val optionsWithPath = - if (!options.contains("path")) { - isExternal = false - options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) - } else { - options - } - - var existingSchema = None: Option[StructType] - if (sqlContext.sessionState.catalog.tableExists(tableIdent)) { - // Check if we need to throw an exception or just return. - mode match { - case SaveMode.ErrorIfExists => - throw new AnalysisException(s"Table $tableName already exists. " + - s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + - s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + - s"the existing data. " + - s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") - case SaveMode.Ignore => - // Since the table already exists and the save mode is Ignore, we will just return. - return Seq.empty[Row] - case SaveMode.Append => - // Check if the specified data source match the data source of the existing table. - val dataSource = DataSource( - sqlContext = sqlContext, - userSpecifiedSchema = Some(query.schema.asNullable), - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = provider, - options = optionsWithPath) - // TODO: Check that options from the resolved relation match the relation that we are - // inserting into (i.e. using the same compression). - - EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdent)) match { - case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => - existingSchema = Some(l.schema) - case o => - throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") - } - case SaveMode.Overwrite => - sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") - // Need to create the table again. - createMetastoreTable = true - } - } else { - // The table does not exist. We need to create it in metastore. - createMetastoreTable = true - } - - val data = Dataset.ofRows(sqlContext, query) - val df = existingSchema match { - // If we are inserting into an existing table, just use the existing schema. - case Some(s) => data.selectExpr(s.fieldNames: _*) - case None => data - } - - // Create the relation based on the data of df. - val dataSource = DataSource( - sqlContext, - className = provider, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - options = optionsWithPath) - - val result = dataSource.write(mode, df) - - if (createMetastoreTable) { - // We will use the schema of resolved.relation as the schema of the table (instead of - // the schema of df). It is important since the nullability may be changed by the relation - // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - sessionState.catalog.createDataSourceTable( - tableIdent, - Some(result.schema), - partitionColumns, - bucketSpec, - provider, - optionsWithPath, - isExternal) - } - - // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(tableIdent) - Seq.empty[Row] - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 11165a7ebb..68244cdb11 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.DataTypeParser +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -699,8 +700,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. - sessionState.catalog.createDataSourceTable( - name = TableIdentifier("wide_schema"), + CreateDataSourceTableUtils.createDataSourceTable( + sqlContext = sqlContext, + tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -907,8 +909,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempDir { tempPath => val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - sessionState.catalog.createDataSourceTable( - name = TableIdentifier("not_skip_hive_metadata"), + CreateDataSourceTableUtils.createDataSourceTable( + sqlContext = sqlContext, + tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -921,8 +924,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema .forall(column => DataTypeParser.parse(column.dataType) == StringType)) - sessionState.catalog.createDataSourceTable( - name = TableIdentifier("skip_hive_metadata"), + CreateDataSourceTableUtils.createDataSourceTable( + sqlContext = sqlContext, + tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, -- cgit v1.2.3