aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-23 22:29:31 -0700
committerYin Huai <yhuai@databricks.com>2016-04-23 22:29:31 -0700
commit1672149c2644b5670b4b9a4086a4456fb8279a55 (patch)
tree0b90c17e83d037de61577acd360446803daa6c98 /sql
parent28538596558b7f69f9d22eb0902d0e609d98be88 (diff)
downloadspark-1672149c2644b5670b4b9a4086a4456fb8279a55.tar.gz
spark-1672149c2644b5670b4b9a4086a4456fb8279a55.tar.bz2
spark-1672149c2644b5670b4b9a4086a4456fb8279a55.zip
[SPARK-14879][SQL] Move CreateMetastoreDataSource and CreateMetastoreDataSourceAsSelect to sql/core
## What changes were proposed in this pull request? CreateMetastoreDataSource and CreateMetastoreDataSourceAsSelect are not Hive-specific. So, this PR moves them from sql/hive to sql/core. Also, I am adding `Command` suffix to these two classes. ## How was this patch tested? Existing tests. Author: Yin Huai <yhuai@databricks.com> Closes #12645 from yhuai/moveCreateDataSource.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala452
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala192
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala25
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala212
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala16
9 files changed, 497 insertions, 451 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 152bd499a0..67b1752ee8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,8 @@ import java.io.File
import scala.collection.mutable
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
@@ -221,6 +223,13 @@ class SessionCatalog(
inheritTableSpecs, isSkewedStoreAsSubdir)
}
+ def defaultTablePath(tableIdent: TableIdentifier): String = {
+ val dbName = tableIdent.database.getOrElse(currentDb)
+ val dbLocation = getDatabaseMetadata(dbName).locationUri
+
+ new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
+ }
+
// -------------------------------------------------------------
// | Methods that interact with temporary and metastore tables |
// -------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3ce5f28bf3..3c10504fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
@@ -421,10 +421,21 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ExecutedCommandExec(
CreateTempTableUsing(
tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
+
case c: CreateTableUsing if !c.temporary =>
- sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+ val cmd =
+ CreateDataSourceTableCommand(
+ c.tableIdent,
+ c.userSpecifiedSchema,
+ c.provider,
+ c.options,
+ c.allowExisting,
+ c.managedIfNoPath)
+ ExecutedCommandExec(cmd) :: Nil
+
case c: CreateTableUsing if c.temporary && c.allowExisting =>
- sys.error("allowExisting should be set to false when creating a temporary table.")
+ throw new AnalysisException(
+ "allowExisting should be set to false when creating a temporary table.")
case c: CreateTableUsingAsSelect if c.temporary && c.partitionColumns.nonEmpty =>
sys.error("Cannot create temporary partitioned table.")
@@ -433,8 +444,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
val cmd = CreateTempTableUsingAsSelect(
c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
ExecutedCommandExec(cmd) :: Nil
+
case c: CreateTableUsingAsSelect if !c.temporary =>
- sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+ val cmd =
+ CreateDataSourceTableAsSelectCommand(
+ c.tableIdent,
+ c.provider,
+ c.partitionColumns,
+ c.bucketSpec,
+ c.mode,
+ c.options,
+ c.child)
+ ExecutedCommandExec(cmd) :: Nil
case logical.ShowFunctions(db, pattern) =>
ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
new file mode 100644
index 0000000000..0ef1d1d688
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types._
+
+/**
+ * A command used to create a data source table.
+ *
+ * Note: This is different from [[CreateTable]]. Please check the syntax for difference.
+ * This is not intended for temporary tables.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ * [(col1 data_type [COMMENT col_comment], ...)]
+ * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ * }}}
+ */
+case class CreateDataSourceTableCommand(
+ tableIdent: TableIdentifier,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String],
+ ignoreIfExists: Boolean,
+ managedIfNoPath: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ // Since we are saving metadata to metastore, we need to check if metastore supports
+ // the table name and database name we have for this query. MetaStoreUtils.validateName
+ // is the method used by Hive to check if a table name or a database name is valid for
+ // the metastore.
+ if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
+ throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
+ s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+ }
+ if (tableIdent.database.isDefined &&
+ !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
+ throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
+ s"for metastore. Metastore only accepts database name containing " +
+ s"characters, numbers and _.")
+ }
+
+ val tableName = tableIdent.unquotedString
+ val sessionState = sqlContext.sessionState
+
+ if (sessionState.catalog.tableExists(tableIdent)) {
+ if (ignoreIfExists) {
+ return Seq.empty[Row]
+ } else {
+ throw new AnalysisException(s"Table $tableName already exists.")
+ }
+ }
+
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path") && managedIfNoPath) {
+ isExternal = false
+ options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
+ } else {
+ options
+ }
+
+ // Create the relation to validate the arguments before writing the metadata to the metastore.
+ DataSource(
+ sqlContext = sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ className = provider,
+ bucketSpec = None,
+ options = optionsWithPath).resolveRelation()
+
+ CreateDataSourceTableUtils.createDataSourceTable(
+ sqlContext = sqlContext,
+ tableIdent = tableIdent,
+ userSpecifiedSchema = userSpecifiedSchema,
+ partitionColumns = Array.empty[String],
+ bucketSpec = None,
+ provider = provider,
+ options = optionsWithPath,
+ isExternal = isExternal)
+
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * A command used to create a data source table using the result of a query.
+ *
+ * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference.
+ * This is not intended for temporary tables.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
+ * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ * AS SELECT ...
+ * }}}
+ */
+case class CreateDataSourceTableAsSelectCommand(
+ tableIdent: TableIdentifier,
+ provider: String,
+ partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
+ mode: SaveMode,
+ options: Map[String, String],
+ query: LogicalPlan)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ // Since we are saving metadata to metastore, we need to check if metastore supports
+ // the table name and database name we have for this query. MetaStoreUtils.validateName
+ // is the method used by Hive to check if a table name or a database name is valid for
+ // the metastore.
+ if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
+ throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
+ s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+ }
+ if (tableIdent.database.isDefined &&
+ !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
+ throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
+ s"for metastore. Metastore only accepts database name containing " +
+ s"characters, numbers and _.")
+ }
+
+ val tableName = tableIdent.unquotedString
+ val sessionState = sqlContext.sessionState
+ var createMetastoreTable = false
+ var isExternal = true
+ val optionsWithPath =
+ if (!options.contains("path")) {
+ isExternal = false
+ options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
+ } else {
+ options
+ }
+
+ var existingSchema = None: Option[StructType]
+ if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
+ // Check if we need to throw an exception or just return.
+ mode match {
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(s"Table $tableName already exists. " +
+ s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
+ s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
+ s"the existing data. " +
+ s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
+ case SaveMode.Ignore =>
+ // Since the table already exists and the save mode is Ignore, we will just return.
+ return Seq.empty[Row]
+ case SaveMode.Append =>
+ // Check if the specified data source match the data source of the existing table.
+ val dataSource = DataSource(
+ sqlContext = sqlContext,
+ userSpecifiedSchema = Some(query.schema.asNullable),
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ className = provider,
+ options = optionsWithPath)
+ // TODO: Check that options from the resolved relation match the relation that we are
+ // inserting into (i.e. using the same compression).
+
+ EliminateSubqueryAliases(
+ sessionState.catalog.lookupRelation(tableIdent)) match {
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
+ existingSchema = Some(l.schema)
+ case o =>
+ throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
+ }
+ case SaveMode.Overwrite =>
+ sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
+ // Need to create the table again.
+ createMetastoreTable = true
+ }
+ } else {
+ // The table does not exist. We need to create it in metastore.
+ createMetastoreTable = true
+ }
+
+ val data = Dataset.ofRows(sqlContext, query)
+ val df = existingSchema match {
+ // If we are inserting into an existing table, just use the existing schema.
+ case Some(s) => data.selectExpr(s.fieldNames: _*)
+ case None => data
+ }
+
+ // Create the relation based on the data of df.
+ val dataSource = DataSource(
+ sqlContext,
+ className = provider,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ options = optionsWithPath)
+
+ val result = dataSource.write(mode, df)
+
+ if (createMetastoreTable) {
+ // We will use the schema of resolved.relation as the schema of the table (instead of
+ // the schema of df). It is important since the nullability may be changed by the relation
+ // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
+ CreateDataSourceTableUtils.createDataSourceTable(
+ sqlContext = sqlContext,
+ tableIdent = tableIdent,
+ userSpecifiedSchema = Some(result.schema),
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ provider = provider,
+ options = optionsWithPath,
+ isExternal = isExternal)
+ }
+
+ // Refresh the cache of the table in the catalog.
+ sessionState.catalog.refreshTable(tableIdent)
+ Seq.empty[Row]
+ }
+}
+
+object CreateDataSourceTableUtils extends Logging {
+ /**
+ * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
+ * i.e. if this name only contains characters, numbers, and _.
+ *
+ * This method is intended to have the same behavior of
+ * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
+ */
+ def validateName(name: String): Boolean = {
+ val tpat = Pattern.compile("[\\w_]+")
+ val matcher = tpat.matcher(name)
+
+ matcher.matches()
+ }
+
+ def createDataSourceTable(
+ sqlContext: SQLContext,
+ tableIdent: TableIdentifier,
+ userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
+ provider: String,
+ options: Map[String, String],
+ isExternal: Boolean): Unit = {
+ val tableProperties = new mutable.HashMap[String, String]
+ tableProperties.put("spark.sql.sources.provider", provider)
+
+ // Saves optional user specified schema. Serialized JSON schema string may be too long to be
+ // stored into a single metastore SerDe property. In this case, we split the JSON string and
+ // store each part as a separate SerDe property.
+ userSpecifiedSchema.foreach { schema =>
+ val threshold = sqlContext.sessionState.conf.schemaStringLengthThreshold
+ val schemaJsonString = schema.json
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
+ parts.zipWithIndex.foreach { case (part, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+ }
+ }
+
+ if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
+ tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+ partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
+ }
+ }
+
+ if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+ val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+ tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
+ tableProperties.put("spark.sql.sources.schema.numBucketCols",
+ bucketColumnNames.length.toString)
+ bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
+ }
+
+ if (sortColumnNames.nonEmpty) {
+ tableProperties.put("spark.sql.sources.schema.numSortCols",
+ sortColumnNames.length.toString)
+ sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
+ }
+ }
+ }
+
+ if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
+ // The table does not have a specified schema, which means that the schema will be inferred
+ // when we load the table. So, we are not expecting partition columns and we will discover
+ // partitions when we load the table. However, if there are specified partition columns,
+ // we simply ignore them and provide a warning message.
+ logWarning(
+ s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
+ s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
+ }
+
+ val tableType = if (isExternal) {
+ tableProperties.put("EXTERNAL", "TRUE")
+ CatalogTableType.EXTERNAL_TABLE
+ } else {
+ tableProperties.put("EXTERNAL", "FALSE")
+ CatalogTableType.MANAGED_TABLE
+ }
+
+ val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sqlContext.sessionState.conf)
+ val dataSource =
+ DataSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ partitionColumns = partitionColumns,
+ bucketSpec = bucketSpec,
+ className = provider,
+ options = options)
+
+ def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+ CatalogTable(
+ identifier = tableIdent,
+ tableType = tableType,
+ schema = Nil,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = options
+ ),
+ properties = tableProperties.toMap)
+ }
+
+ def newHiveCompatibleMetastoreTable(
+ relation: HadoopFsRelation,
+ serde: HiveSerDe): CatalogTable = {
+ assert(partitionColumns.isEmpty)
+ assert(relation.partitionSchema.isEmpty)
+
+ CatalogTable(
+ identifier = tableIdent,
+ tableType = tableType,
+ storage = CatalogStorageFormat(
+ locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde,
+ serdeProperties = options
+ ),
+ schema = relation.schema.map { f =>
+ CatalogColumn(f.name, f.dataType.catalogString)
+ },
+ properties = tableProperties.toMap,
+ viewText = None)
+ }
+
+ // TODO: Support persisting partitioned data source relations in Hive compatible format
+ val qualifiedTableName = tableIdent.quotedString
+ val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
+ val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
+ case _ if skipHiveMetadata =>
+ val message =
+ s"Persisting partitioned data source relation $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+
+ case (Some(serde), relation: HadoopFsRelation)
+ if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty =>
+ val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
+ val message =
+ s"Persisting data source relation $qualifiedTableName with a single input path " +
+ s"into Hive metastore in Hive compatible format. Input path: " +
+ s"${relation.location.paths.head}."
+ (Some(hiveTable), message)
+
+ case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
+ val message =
+ s"Persisting partitioned data source relation $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
+ (None, message)
+
+ case (Some(serde), relation: HadoopFsRelation) =>
+ val message =
+ s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
+ s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
+ (None, message)
+
+ case (Some(serde), _) =>
+ val message =
+ s"Data source relation $qualifiedTableName is not a " +
+ s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
+ "in Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+
+ case _ =>
+ val message =
+ s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+ s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
+ s"Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+ }
+
+ (hiveCompatibleTable, logMessage) match {
+ case (Some(table), message) =>
+ // We first try to save the metadata of the table in a Hive compatible way.
+ // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+ // specific way.
+ try {
+ logInfo(message)
+ sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ } catch {
+ case NonFatal(e) =>
+ val warningMessage =
+ s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
+ s"it into Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, e)
+ val table = newSparkSQLSpecificMetastoreTable()
+ sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ }
+
+ case (None, message) =>
+ logWarning(message)
+ val table = newSparkSQLSpecificMetastoreTable()
+ sqlContext.sessionState.catalog.createTable(table, ignoreIfExists = false)
+ }
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9353e9ccd2..13f29e08fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -156,198 +156,6 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}
- def createDataSourceTable(
- tableIdent: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
-
- val tableProperties = new mutable.HashMap[String, String]
- tableProperties.put("spark.sql.sources.provider", provider)
-
- // Saves optional user specified schema. Serialized JSON schema string may be too long to be
- // stored into a single metastore SerDe property. In this case, we split the JSON string and
- // store each part as a separate SerDe property.
- userSpecifiedSchema.foreach { schema =>
- val threshold = conf.schemaStringLengthThreshold
- val schemaJsonString = schema.json
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
- parts.zipWithIndex.foreach { case (part, index) =>
- tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
- }
- }
-
- if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
- tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
- partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
- }
- }
-
- if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
- val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
-
- tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
- tableProperties.put("spark.sql.sources.schema.numBucketCols",
- bucketColumnNames.length.toString)
- bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
- }
-
- if (sortColumnNames.nonEmpty) {
- tableProperties.put("spark.sql.sources.schema.numSortCols",
- sortColumnNames.length.toString)
- sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
- }
- }
- }
-
- if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
- // The table does not have a specified schema, which means that the schema will be inferred
- // when we load the table. So, we are not expecting partition columns and we will discover
- // partitions when we load the table. However, if there are specified partition columns,
- // we simply ignore them and provide a warning message.
- logWarning(
- s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
- s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
- }
-
- val tableType = if (isExternal) {
- tableProperties.put("EXTERNAL", "TRUE")
- CatalogTableType.EXTERNAL_TABLE
- } else {
- tableProperties.put("EXTERNAL", "FALSE")
- CatalogTableType.MANAGED_TABLE
- }
-
- val maybeSerDe = HiveSerDe.sourceToSerDe(provider, conf)
- val dataSource =
- DataSource(
- hive,
- userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- className = provider,
- options = options)
-
- def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
- CatalogTable(
- identifier = TableIdentifier(tblName, Option(dbName)),
- tableType = tableType,
- schema = Nil,
- storage = CatalogStorageFormat(
- locationUri = None,
- inputFormat = None,
- outputFormat = None,
- serde = None,
- serdeProperties = options
- ),
- properties = tableProperties.toMap)
- }
-
- def newHiveCompatibleMetastoreTable(
- relation: HadoopFsRelation,
- serde: HiveSerDe): CatalogTable = {
- assert(partitionColumns.isEmpty)
- assert(relation.partitionSchema.isEmpty)
-
- CatalogTable(
- identifier = TableIdentifier(tblName, Option(dbName)),
- tableType = tableType,
- storage = CatalogStorageFormat(
- locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
- inputFormat = serde.inputFormat,
- outputFormat = serde.outputFormat,
- serde = serde.serde,
- serdeProperties = options
- ),
- schema = relation.schema.map { f =>
- CatalogColumn(f.name, f.dataType.catalogString)
- },
- properties = tableProperties.toMap,
- viewText = None) // TODO: We need to place the SQL string here
- }
-
- // TODO: Support persisting partitioned data source relations in Hive compatible format
- val qualifiedTableName = tableIdent.quotedString
- val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
- val (hiveCompatibleTable, logMessage) = (maybeSerDe, dataSource.resolveRelation()) match {
- case _ if skipHiveMetadata =>
- val message =
- s"Persisting partitioned data source relation $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
-
- case (Some(serde), relation: HadoopFsRelation)
- if relation.location.paths.length == 1 && relation.partitionSchema.isEmpty =>
- val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
- val message =
- s"Persisting data source relation $qualifiedTableName with a single input path " +
- s"into Hive metastore in Hive compatible format. Input path: " +
- s"${relation.location.paths.head}."
- (Some(hiveTable), message)
-
- case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
- val message =
- s"Persisting partitioned data source relation $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
- "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
- (None, message)
-
- case (Some(serde), relation: HadoopFsRelation) =>
- val message =
- s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
- s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
- (None, message)
-
- case (Some(serde), _) =>
- val message =
- s"Data source relation $qualifiedTableName is not a " +
- s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
- "in Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
-
- case _ =>
- val message =
- s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
- s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
- }
-
- (hiveCompatibleTable, logMessage) match {
- case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatible way.
- // If Hive throws an error, we fall back to save its metadata in the Spark SQL
- // specific way.
- try {
- logInfo(message)
- client.createTable(table, ignoreIfExists = false)
- } catch {
- case throwable: Throwable =>
- val warningMessage =
- s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
- s"it into Hive metastore in Spark SQL specific format."
- logWarning(warningMessage, throwable)
- val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(sparkSqlSpecificTable, ignoreIfExists = false)
- }
-
- case (None, message) =>
- logWarning(message)
- val hiveTable = newSparkSQLSpecificMetastoreTable()
- client.createTable(hiveTable, ignoreIfExists = false)
- }
- }
-
def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 4f9513389c..3e718826fe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -103,18 +103,6 @@ private[sql] class HiveSessionCatalog(
metastoreCatalog.cachedDataSourceTables.invalidateAll()
}
- def createDataSourceTable(
- name: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- metastoreCatalog.createDataSourceTable(
- name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
- }
-
def hiveDefaultTableFilePath(name: TableIdentifier): String = {
metastoreCatalog.hiveDefaultTableFilePath(name)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index a22e19207e..636bfdebf7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -110,7 +110,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
experimentalMethods.extraStrategies ++ Seq(
FileSourceStrategy,
DataSourceStrategy,
- HiveDDLStrategy,
DDLStrategy,
SpecialLimits,
InMemoryScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 2d36ddafe6..2bea32b144 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, CreateTempTableUsingAsSelect}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution._
private[hive] trait HiveStrategies {
@@ -83,27 +83,4 @@ private[hive] trait HiveStrategies {
Nil
}
}
-
- object HiveDDLStrategy extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTableUsing(
- tableIdent, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
- val cmd =
- CreateMetastoreDataSource(
- tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)
- ExecutedCommandExec(cmd) :: Nil
-
- case c: CreateTableUsingAsSelect if c.temporary =>
- val cmd = CreateTempTableUsingAsSelect(
- c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child)
- ExecutedCommandExec(cmd) :: Nil
-
- case c: CreateTableUsingAsSelect =>
- val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns,
- c.bucketSpec, c.mode, c.options, c.child)
- ExecutedCommandExec(cmd) :: Nil
-
- case _ => Nil
- }
- }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
deleted file mode 100644
index 6899f46eec..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.hadoop.hive.metastore.MetaStoreUtils
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.HiveSessionState
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-
-
-private[hive]
-case class CreateMetastoreDataSource(
- tableIdent: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- options: Map[String, String],
- allowExisting: Boolean,
- managedIfNoPath: Boolean) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- // Since we are saving metadata to metastore, we need to check if metastore supports
- // the table name and database name we have for this query. MetaStoreUtils.validateName
- // is the method used by Hive to check if a table name or a database name is valid for
- // the metastore.
- if (!MetaStoreUtils.validateName(tableIdent.table)) {
- throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
- s"metastore. Metastore only accepts table name containing characters, numbers and _.")
- }
- if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) {
- throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
- s"for metastore. Metastore only accepts database name containing " +
- s"characters, numbers and _.")
- }
-
- val tableName = tableIdent.unquotedString
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-
- if (sessionState.catalog.tableExists(tableIdent)) {
- if (allowExisting) {
- return Seq.empty[Row]
- } else {
- throw new AnalysisException(s"Table $tableName already exists.")
- }
- }
-
- var isExternal = true
- val optionsWithPath =
- if (!options.contains("path") && managedIfNoPath) {
- isExternal = false
- options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
- } else {
- options
- }
-
- // Create the relation to validate the arguments before writing the metadata to the metastore.
- DataSource(
- sqlContext = sqlContext,
- userSpecifiedSchema = userSpecifiedSchema,
- className = provider,
- bucketSpec = None,
- options = optionsWithPath).resolveRelation()
-
- sessionState.catalog.createDataSourceTable(
- tableIdent,
- userSpecifiedSchema,
- Array.empty[String],
- bucketSpec = None,
- provider,
- optionsWithPath,
- isExternal)
-
- Seq.empty[Row]
- }
-}
-
-private[hive]
-case class CreateMetastoreDataSourceAsSelect(
- tableIdent: TableIdentifier,
- provider: String,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- mode: SaveMode,
- options: Map[String, String],
- query: LogicalPlan) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- // Since we are saving metadata to metastore, we need to check if metastore supports
- // the table name and database name we have for this query. MetaStoreUtils.validateName
- // is the method used by Hive to check if a table name or a database name is valid for
- // the metastore.
- if (!MetaStoreUtils.validateName(tableIdent.table)) {
- throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
- s"metastore. Metastore only accepts table name containing characters, numbers and _.")
- }
- if (tableIdent.database.isDefined && !MetaStoreUtils.validateName(tableIdent.database.get)) {
- throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
- s"for metastore. Metastore only accepts database name containing " +
- s"characters, numbers and _.")
- }
-
- val tableName = tableIdent.unquotedString
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- var createMetastoreTable = false
- var isExternal = true
- val optionsWithPath =
- if (!options.contains("path")) {
- isExternal = false
- options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
- } else {
- options
- }
-
- var existingSchema = None: Option[StructType]
- if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
- // Check if we need to throw an exception or just return.
- mode match {
- case SaveMode.ErrorIfExists =>
- throw new AnalysisException(s"Table $tableName already exists. " +
- s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
- s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
- s"the existing data. " +
- s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
- case SaveMode.Ignore =>
- // Since the table already exists and the save mode is Ignore, we will just return.
- return Seq.empty[Row]
- case SaveMode.Append =>
- // Check if the specified data source match the data source of the existing table.
- val dataSource = DataSource(
- sqlContext = sqlContext,
- userSpecifiedSchema = Some(query.schema.asNullable),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- className = provider,
- options = optionsWithPath)
- // TODO: Check that options from the resolved relation match the relation that we are
- // inserting into (i.e. using the same compression).
-
- EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(tableIdent)) match {
- case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
- existingSchema = Some(l.schema)
- case o =>
- throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
- }
- case SaveMode.Overwrite =>
- sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
- // Need to create the table again.
- createMetastoreTable = true
- }
- } else {
- // The table does not exist. We need to create it in metastore.
- createMetastoreTable = true
- }
-
- val data = Dataset.ofRows(sqlContext, query)
- val df = existingSchema match {
- // If we are inserting into an existing table, just use the existing schema.
- case Some(s) => data.selectExpr(s.fieldNames: _*)
- case None => data
- }
-
- // Create the relation based on the data of df.
- val dataSource = DataSource(
- sqlContext,
- className = provider,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- options = optionsWithPath)
-
- val result = dataSource.write(mode, df)
-
- if (createMetastoreTable) {
- // We will use the schema of resolved.relation as the schema of the table (instead of
- // the schema of df). It is important since the nullability may be changed by the relation
- // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
- sessionState.catalog.createDataSourceTable(
- tableIdent,
- Some(result.schema),
- partitionColumns,
- bucketSpec,
- provider,
- optionsWithPath,
- isExternal)
- }
-
- // Refresh the cache of the table in the catalog.
- sessionState.catalog.refreshTable(tableIdent)
- Seq.empty[Row]
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 11165a7ebb..68244cdb11 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.DataTypeParser
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -699,8 +700,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
// Manually create a metastore data source table.
- sessionState.catalog.createDataSourceTable(
- name = TableIdentifier("wide_schema"),
+ CreateDataSourceTableUtils.createDataSourceTable(
+ sqlContext = sqlContext,
+ tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
@@ -907,8 +909,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTempDir { tempPath =>
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
- sessionState.catalog.createDataSourceTable(
- name = TableIdentifier("not_skip_hive_metadata"),
+ CreateDataSourceTableUtils.createDataSourceTable(
+ sqlContext = sqlContext,
+ tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
@@ -921,8 +924,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
.forall(column => DataTypeParser.parse(column.dataType) == StringType))
- sessionState.catalog.createDataSourceTable(
- name = TableIdentifier("skip_hive_metadata"),
+ CreateDataSourceTableUtils.createDataSourceTable(
+ sqlContext = sqlContext,
+ tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,