aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-21 22:23:14 -0700
committerYin Huai <yhuai@databricks.com>2016-08-21 22:23:14 -0700
commitb2074b664a9c269c4103760d40c4a14e7aeb1e83 (patch)
tree58cf286848123d09fb9e29bc92a800b0fc91ef88 /sql/hive/src/main
parent91c2397684ab791572ac57ffb2a924ff058bb64f (diff)
downloadspark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.tar.gz
spark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.tar.bz2
spark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.zip
[SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog
## What changes were proposed in this pull request? Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties. This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place. changes overview: 1. **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`) **after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore. 2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties. **after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it. bonus: now we can create data source table using `SessionCatalog`, if schema is specified. breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14155 from cloud-fan/catalog-table.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala328
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala3
4 files changed, 327 insertions, 87 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8302e3e98a..de3e60a44d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -30,7 +30,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.types.{DataType, StructType}
/**
@@ -41,6 +45,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
extends ExternalCatalog with Logging {
import CatalogTypes.TablePartitionSpec
+ import HiveExternalCatalog._
+ import CatalogTableType._
// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(
@@ -81,6 +87,20 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
withClient { getTable(db, table) }
}
+ /**
+ * If the given table properties contains datasource properties, throw an exception. We will do
+ * this check when create or alter a table, i.e. when we try to write table metadata to Hive
+ * metastore.
+ */
+ private def verifyTableProperties(table: CatalogTable): Unit = {
+ val datasourceKeys = table.properties.keys.filter(_.startsWith(DATASOURCE_PREFIX))
+ if (datasourceKeys.nonEmpty) {
+ throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
+ s"as table property keys may not start with '$DATASOURCE_PREFIX': " +
+ datasourceKeys.mkString("[", ", ", "]"))
+ }
+ }
+
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
@@ -144,16 +164,162 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireDbExists(db)
+ verifyTableProperties(tableDefinition)
+
+ // Before saving data source table metadata into Hive metastore, we should:
+ // 1. Put table schema, partition column names and bucket specification in table properties.
+ // 2. Check if this table is hive compatible
+ // 2.1 If it's not hive compatible, set schema, partition columns and bucket spec to empty
+ // and save table metadata to Hive.
+ // 2.1 If it's hive compatible, set serde information in table metadata and try to save
+ // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
+ if (DDLUtils.isDatasourceTable(tableDefinition)) {
+ // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
+ val provider = tableDefinition.provider.get
+ val partitionColumns = tableDefinition.partitionColumnNames
+ val bucketSpec = tableDefinition.bucketSpec
+
+ val tableProperties = new scala.collection.mutable.HashMap[String, String]
+ tableProperties.put(DATASOURCE_PROVIDER, provider)
+
+ // Serialized JSON schema string may be too long to be stored into a single metastore table
+ // property. In this case, we split the JSON string and store each part as a separate table
+ // property.
+ // TODO: the threshold should be set by `spark.sql.sources.schemaStringLengthThreshold`,
+ // however the current SQLConf is session isolated, which is not applicable to external
+ // catalog. We should re-enable this conf instead of hard code the value here, after we have
+ // global SQLConf.
+ val threshold = 4000
+ val schemaJsonString = tableDefinition.schema.json
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
+ parts.zipWithIndex.foreach { case (part, index) =>
+ tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
+ }
+
+ if (partitionColumns.nonEmpty) {
+ tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
+ partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
+ tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
+ }
+ }
+
+ if (bucketSpec.isDefined) {
+ val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+ tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
+ bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+ tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
+ }
+
+ if (sortColumnNames.nonEmpty) {
+ tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
+ sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+ tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
+ }
+ }
+ }
+
+ // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
+ // names and bucket specification to empty.
+ def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+ tableDefinition.copy(
+ schema = new StructType,
+ partitionColumnNames = Nil,
+ bucketSpec = None,
+ properties = tableDefinition.properties ++ tableProperties)
+ }
+
+ // converts the table metadata to Hive compatible format, i.e. set the serde information.
+ def newHiveCompatibleMetastoreTable(serde: HiveSerDe, path: String): CatalogTable = {
+ tableDefinition.copy(
+ storage = tableDefinition.storage.copy(
+ locationUri = Some(new Path(path).toUri.toString),
+ inputFormat = serde.inputFormat,
+ outputFormat = serde.outputFormat,
+ serde = serde.serde
+ ),
+ properties = tableDefinition.properties ++ tableProperties)
+ }
+
+ val qualifiedTableName = tableDefinition.identifier.quotedString
+ val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get)
+ val maybePath = new CaseInsensitiveMap(tableDefinition.storage.properties).get("path")
+ val skipHiveMetadata = tableDefinition.storage.properties
+ .getOrElse("skipHiveMetadata", "false").toBoolean
+
+ val (hiveCompatibleTable, logMessage) = (maybeSerde, maybePath) match {
+ case _ if skipHiveMetadata =>
+ val message =
+ s"Persisting data source table $qualifiedTableName into Hive metastore in" +
+ "Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+
+ // our bucketing is un-compatible with hive(different hash function)
+ case _ if tableDefinition.bucketSpec.nonEmpty =>
+ val message =
+ s"Persisting bucketed data source table $qualifiedTableName into " +
+ "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
+ (None, message)
+
+ case (Some(serde), Some(path)) =>
+ val message =
+ s"Persisting file based data source table $qualifiedTableName with an input path " +
+ s"into Hive metastore in Hive compatible format."
+ (Some(newHiveCompatibleMetastoreTable(serde, path)), message)
+
+ case (Some(_), None) =>
+ val message =
+ s"Data source table $qualifiedTableName is not file based. Persisting it into " +
+ s"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+
+ case _ =>
+ val provider = tableDefinition.provider.get
+ val message =
+ s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+ s"Persisting data source table $qualifiedTableName into Hive metastore in " +
+ s"Spark SQL specific format, which is NOT compatible with Hive."
+ (None, message)
+ }
+
+ (hiveCompatibleTable, logMessage) match {
+ case (Some(table), message) =>
+ // We first try to save the metadata of the table in a Hive compatible way.
+ // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+ // specific way.
+ try {
+ logInfo(message)
+ saveTableIntoHive(table, ignoreIfExists)
+ } catch {
+ case NonFatal(e) =>
+ val warningMessage =
+ s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " +
+ "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
+ logWarning(warningMessage, e)
+ saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+ }
+
+ case (None, message) =>
+ logWarning(message)
+ saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+ }
+ } else {
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
+ }
- if (
+ private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ assert(DDLUtils.isDatasourceTable(tableDefinition),
+ "saveTableIntoHive only takes data source table.")
// If this is an external data source table...
- tableDefinition.properties.contains("spark.sql.sources.provider") &&
- tableDefinition.tableType == CatalogTableType.EXTERNAL &&
- // ... that is not persisted as Hive compatible format (external tables in Hive compatible
- // format always set `locationUri` to the actual data location and should NOT be hacked as
- // following.)
- tableDefinition.storage.locationUri.isEmpty
- ) {
+ if (tableDefinition.tableType == EXTERNAL &&
+ // ... that is not persisted as Hive compatible format (external tables in Hive compatible
+ // format always set `locationUri` to the actual data location and should NOT be hacked as
+ // following.)
+ tableDefinition.storage.locationUri.isEmpty) {
// !! HACK ALERT !!
//
// Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary
@@ -200,22 +366,79 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
* Alter a table whose name that matches the one specified in `tableDefinition`,
* assuming the table exists.
*
- * Note: As of now, this only supports altering table properties, serde properties,
- * and num buckets!
+ * Note: As of now, this doesn't support altering table schema, partition column names and bucket
+ * specification. We will ignore them even if users do specify different values for these fields.
*/
override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
requireTableExists(db, tableDefinition.identifier.table)
- client.alterTable(tableDefinition)
+ verifyTableProperties(tableDefinition)
+
+ if (DDLUtils.isDatasourceTable(tableDefinition)) {
+ val oldDef = client.getTable(db, tableDefinition.identifier.table)
+ // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
+ // to retain the spark specific format if it is. Also add old data source properties to table
+ // properties, to retain the data source table format.
+ val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+ val newDef = tableDefinition.copy(
+ schema = oldDef.schema,
+ partitionColumnNames = oldDef.partitionColumnNames,
+ bucketSpec = oldDef.bucketSpec,
+ properties = oldDataSourceProps ++ tableDefinition.properties)
+
+ client.alterTable(newDef)
+ } else {
+ client.alterTable(tableDefinition)
+ }
}
override def getTable(db: String, table: String): CatalogTable = withClient {
- client.getTable(db, table)
+ restoreTableMetadata(client.getTable(db, table))
}
override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
- client.getTableOption(db, table)
+ client.getTableOption(db, table).map(restoreTableMetadata)
+ }
+
+ /**
+ * Restores table metadata from the table properties if it's a datasouce table. This method is
+ * kind of a opposite version of [[createTable]].
+ *
+ * It reads table schema, provider, partition column names and bucket specification from table
+ * properties, and filter out these special entries from table properties.
+ */
+ private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
+ if (table.tableType == VIEW) {
+ table
+ } else {
+ getProviderFromTableProperties(table).map { provider =>
+ assert(provider != "hive", "Hive serde table should not save provider in table properties.")
+ // SPARK-15269: Persisted data source tables always store the location URI as a storage
+ // property named "path" instead of standard Hive `dataLocation`, because Hive only
+ // allows directory paths as location URIs while Spark SQL data source tables also
+ // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
+ // data source tables.
+ // Spark SQL may also save external data source in Hive compatible format when
+ // possible, so that these tables can be directly accessed by Hive. For these tables,
+ // `dataLocation` is still necessary. Here we also check for input format because only
+ // these Hive compatible tables set this field.
+ val storage = if (table.tableType == EXTERNAL && table.storage.inputFormat.isEmpty) {
+ table.storage.copy(locationUri = None)
+ } else {
+ table.storage
+ }
+ table.copy(
+ storage = storage,
+ schema = getSchemaFromTableProperties(table),
+ provider = Some(provider),
+ partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+ bucketSpec = getBucketSpecFromTableProperties(table),
+ properties = getOriginalTableProperties(table))
+ } getOrElse {
+ table.copy(provider = Some("hive"))
+ }
+ }
}
override def tableExists(db: String, table: String): Boolean = withClient {
@@ -363,3 +586,82 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
}
}
+
+object HiveExternalCatalog {
+ val DATASOURCE_PREFIX = "spark.sql.sources."
+ val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
+ val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
+ val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
+ val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
+ val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+ val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
+ val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
+ val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
+ val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
+ val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
+ val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
+ val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+
+ def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
+ metadata.properties.get(DATASOURCE_PROVIDER)
+ }
+
+ def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
+ metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+ }
+
+ // A persisted data source table always store its schema in the catalog.
+ def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
+ val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
+ val props = metadata.properties
+ props.get(DATASOURCE_SCHEMA).map { schema =>
+ // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
+ // After SPARK-6024, we removed this flag.
+ // Although we are not using `spark.sql.sources.schema` any more, we need to still support.
+ DataType.fromJson(schema).asInstanceOf[StructType]
+ } getOrElse {
+ props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
+ val parts = (0 until numParts.toInt).map { index =>
+ val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
+ if (part == null) {
+ throw new AnalysisException(errorMessage +
+ s" (missing part $index of the schema, $numParts parts are expected).")
+ }
+ part
+ }
+ // Stick all parts back to a single schema string.
+ DataType.fromJson(parts.mkString).asInstanceOf[StructType]
+ } getOrElse {
+ throw new AnalysisException(errorMessage)
+ }
+ }
+ }
+
+ private def getColumnNamesByType(
+ props: Map[String, String],
+ colType: String,
+ typeName: String): Seq[String] = {
+ for {
+ numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
+ index <- 0 until numCols.toInt
+ } yield props.getOrElse(
+ s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
+ throw new AnalysisException(
+ s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
+ )
+ )
+ }
+
+ def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
+ getColumnNamesByType(metadata.properties, "part", "partitioning columns")
+ }
+
+ def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
+ metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
+ BucketSpec(
+ numBuckets.toInt,
+ getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
+ getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
+ }
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7118edabb8..181f470b2a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -68,64 +68,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
- val table = client.getTable(in.database, in.name)
+ val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
- // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable
-
- def schemaStringFromParts: Option[String] = {
- table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
- val parts = (0 until numParts.toInt).map { index =>
- val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
- if (part == null) {
- throw new AnalysisException(
- "Could not read schema from the metastore because it is corrupted " +
- s"(missing part $index of the schema, $numParts parts are expected).")
- }
-
- part
- }
- // Stick all parts back to a single schema string.
- parts.mkString
- }
- }
-
- def getColumnNames(colType: String): Seq[String] = {
- table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
- numCols => (0 until numCols.toInt).map { index =>
- table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
- throw new AnalysisException(
- s"Could not read $colType columns from the metastore because it is corrupted " +
- s"(missing part $index of it, $numCols parts are expected)."))
- }
- }.getOrElse(Nil)
- }
-
- // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
- // After SPARK-6024, we removed this flag.
- // Although we are not using spark.sql.sources.schema any more, we need to still support.
- val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
-
- val userSpecifiedSchema =
- schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
-
- // We only need names at here since userSpecifiedSchema we loaded from the metastore
- // contains partition columns. We can always get data types of partitioning columns
- // from userSpecifiedSchema.
- val partitionColumns = getColumnNames("part")
-
- val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
- BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort"))
- }
-
- val options = table.storage.properties
val dataSource =
DataSource(
sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- className = table.properties(DATASOURCE_PROVIDER),
- options = options)
+ userSpecifiedSchema = Some(table.schema),
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ className = table.provider.get,
+ options = table.storage.properties)
LogicalRelation(
dataSource.resolveRelation(checkPathExist = true),
@@ -158,9 +110,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
val qualifiedTableName = getQualifiedTableName(tableIdent)
- val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
+ val table = sparkSession.sharedState.externalCatalog.getTable(
+ qualifiedTableName.database, qualifiedTableName.name)
- if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
+ if (DDLUtils.isDatasourceTable(table)) {
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f8204e183f..9b7afd4628 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -392,20 +391,7 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
- locationUri = shim.getDataLocation(h).filterNot { _ =>
- // SPARK-15269: Persisted data source tables always store the location URI as a SerDe
- // property named "path" instead of standard Hive `dataLocation`, because Hive only
- // allows directory paths as location URIs while Spark SQL data source tables also
- // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
- // data source tables.
- DDLUtils.isDatasourceTable(properties) &&
- h.getTableType == HiveTableType.EXTERNAL_TABLE &&
- // Spark SQL may also save external data source in Hive compatible format when
- // possible, so that these tables can be directly accessed by Hive. For these tables,
- // `dataLocation` is still necessary. Here we also check for input format class
- // because only these Hive compatible tables set this field.
- h.getInputFormatClass == null
- },
+ locationUri = shim.getDataLocation(h),
inputFormat = Option(h.getInputFormatClass).map(_.getName),
outputFormat = Option(h.getOutputFormatClass).map(_.getName),
serde = Option(h.getSerializationLib),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index c74d948a6f..286197b50e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
@@ -222,7 +221,7 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val uniqueWriteJobId = conf.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+ val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")