aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-08-21 22:23:14 -0700
committerYin Huai <yhuai@databricks.com>2016-08-21 22:23:14 -0700
commitb2074b664a9c269c4103760d40c4a14e7aeb1e83 (patch)
tree58cf286848123d09fb9e29bc92a800b0fc91ef88 /sql/core/src
parent91c2397684ab791572ac57ffb2a924ff058bb64f (diff)
downloadspark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.tar.gz
spark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.tar.bz2
spark-b2074b664a9c269c4103760d40c4a14e7aeb1e83.zip
[SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog
## What changes were proposed in this pull request? Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties. This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place. changes overview: 1. **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`) **after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore. 2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties. **after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it. bonus: now we can create data source table using `SessionCatalog`, if schema is specified. breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14155 from cloud-fan/catalog-table.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala255
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala94
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala110
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala5
14 files changed, 92 insertions, 500 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 71c3bd31e0..e32d30178e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -971,7 +971,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Storage format
val defaultStorage: CatalogStorageFormat = {
val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile")
- val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf)
+ val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType)
CatalogStorageFormat(
locationUri = None,
inputFormat = defaultHiveSerde.flatMap(_.inputFormat)
@@ -1115,7 +1115,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitGenericFileFormat(
ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
val source = ctx.identifier.getText
- HiveSerDe.sourceToSerDe(source, conf) match {
+ HiveSerDe.sourceToSerDe(source) match {
case Some(s) =>
CatalogStorageFormat.empty.copy(
inputFormat = s.inputFormat,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 7b028e72ed..7400a0e7bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,10 +17,6 @@
package org.apache.spark.sql.execution.command
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -28,7 +24,6 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types._
@@ -97,16 +92,19 @@ case class CreateDataSourceTableCommand(
}
}
- CreateDataSourceTableUtils.createDataSourceTable(
- sparkSession = sparkSession,
- tableIdent = tableIdent,
+ val table = CatalogTable(
+ identifier = tableIdent,
+ tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
schema = dataSource.schema,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- provider = provider,
- options = optionsWithPath,
- isExternal = isExternal)
-
+ provider = Some(provider),
+ partitionColumnNames = partitionColumns,
+ bucketSpec = bucketSpec
+ )
+
+ // We will return Nil or throw exception at the beginning if the table already exists, so when
+ // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
+ sessionState.catalog.createTable(table, ignoreIfExists = false)
Seq.empty[Row]
}
}
@@ -193,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
existingSchema = Some(l.schema)
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
- existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata))
+ existingSchema = Some(s.metadata.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
@@ -233,15 +231,17 @@ case class CreateDataSourceTableAsSelectCommand(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
- CreateDataSourceTableUtils.createDataSourceTable(
- sparkSession = sparkSession,
- tableIdent = tableIdent,
- schema = result.schema,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- provider = provider,
- options = optionsWithPath,
- isExternal = isExternal)
+ val schema = result.schema
+ val table = CatalogTable(
+ identifier = tableIdent,
+ tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
+ schema = schema,
+ provider = Some(provider),
+ partitionColumnNames = partitionColumns,
+ bucketSpec = bucketSpec
+ )
+ sessionState.catalog.createTable(table, ignoreIfExists = false)
}
// Refresh the cache of the table in the catalog.
@@ -249,210 +249,3 @@ case class CreateDataSourceTableAsSelectCommand(
Seq.empty[Row]
}
}
-
-
-object CreateDataSourceTableUtils extends Logging {
-
- val DATASOURCE_PREFIX = "spark.sql.sources."
- val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
- val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
- val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
- val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
- val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
- val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
- val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
- val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
- val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
- val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
- val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
- val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
- val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
- val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
-
- def createDataSourceTable(
- sparkSession: SparkSession,
- tableIdent: TableIdentifier,
- schema: StructType,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- provider: String,
- options: Map[String, String],
- isExternal: Boolean): Unit = {
- val tableProperties = new mutable.HashMap[String, String]
- tableProperties.put(DATASOURCE_PROVIDER, provider)
-
- // Serialized JSON schema string may be too long to be stored into a single metastore table
- // property. In this case, we split the JSON string and store each part as a separate table
- // property.
- val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
- val schemaJsonString = schema.json
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
- parts.zipWithIndex.foreach { case (part, index) =>
- tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
- }
-
- if (partitionColumns.length > 0) {
- tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
- partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
- tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
- }
- }
-
- if (bucketSpec.isDefined) {
- val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
-
- tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
- tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
- bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
- tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
- }
-
- if (sortColumnNames.nonEmpty) {
- tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
- sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
- tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
- }
- }
- }
-
- val tableType = if (isExternal) {
- tableProperties.put("EXTERNAL", "TRUE")
- CatalogTableType.EXTERNAL
- } else {
- tableProperties.put("EXTERNAL", "FALSE")
- CatalogTableType.MANAGED
- }
-
- val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf)
- val dataSource =
- DataSource(
- sparkSession,
- userSpecifiedSchema = Some(schema),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- className = provider,
- options = options)
-
- def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
- CatalogTable(
- identifier = tableIdent,
- tableType = tableType,
- schema = new StructType,
- provider = Some(provider),
- storage = CatalogStorageFormat(
- locationUri = None,
- inputFormat = None,
- outputFormat = None,
- serde = None,
- compressed = false,
- properties = options
- ),
- properties = tableProperties.toMap)
- }
-
- def newHiveCompatibleMetastoreTable(
- relation: HadoopFsRelation,
- serde: HiveSerDe): CatalogTable = {
- assert(partitionColumns.isEmpty)
- assert(relation.partitionSchema.isEmpty)
-
- CatalogTable(
- identifier = tableIdent,
- tableType = tableType,
- storage = CatalogStorageFormat(
- locationUri = Some(relation.location.paths.map(_.toUri.toString).head),
- inputFormat = serde.inputFormat,
- outputFormat = serde.outputFormat,
- serde = serde.serde,
- compressed = false,
- properties = options
- ),
- schema = relation.schema,
- provider = Some(provider),
- properties = tableProperties.toMap,
- viewText = None)
- }
-
- // TODO: Support persisting partitioned data source relations in Hive compatible format
- val qualifiedTableName = tableIdent.quotedString
- val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean
- val resolvedRelation = dataSource.resolveRelation(checkPathExist = false)
- val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match {
- case _ if skipHiveMetadata =>
- val message =
- s"Persisting partitioned data source relation $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
-
- case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 &&
- relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
- val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
- val message =
- s"Persisting data source relation $qualifiedTableName with a single input path " +
- s"into Hive metastore in Hive compatible format. Input path: " +
- s"${relation.location.paths.head}."
- (Some(hiveTable), message)
-
- case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty =>
- val message =
- s"Persisting partitioned data source relation $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
- "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
- (None, message)
-
- case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty =>
- val message =
- s"Persisting bucketed data source relation $qualifiedTableName into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
- "Input path(s): " + relation.location.paths.mkString("\n", "\n", "")
- (None, message)
-
- case (Some(serde), relation: HadoopFsRelation) =>
- val message =
- s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
- "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
- s"Input paths: " + relation.location.paths.mkString("\n", "\n", "")
- (None, message)
-
- case (Some(serde), _) =>
- val message =
- s"Data source relation $qualifiedTableName is not a " +
- s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
- "in Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
-
- case _ =>
- val message =
- s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
- s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
- s"Spark SQL specific format, which is NOT compatible with Hive."
- (None, message)
- }
-
- (hiveCompatibleTable, logMessage) match {
- case (Some(table), message) =>
- // We first try to save the metadata of the table in a Hive compatible way.
- // If Hive throws an error, we fall back to save its metadata in the Spark SQL
- // specific way.
- try {
- logInfo(message)
- sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
- } catch {
- case NonFatal(e) =>
- val warningMessage =
- s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
- s"it into Hive metastore in Spark SQL specific format."
- logWarning(warningMessage, e)
- val table = newSparkSQLSpecificMetastoreTable()
- sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
- }
-
- case (None, message) =>
- logWarning(message)
- val table = newSparkSQLSpecificMetastoreTable()
- sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false)
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 2eff9337bc..3817f919f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -27,10 +27,9 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
@@ -234,10 +233,8 @@ case class AlterTableSetPropertiesCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- val ident = if (isView) "VIEW" else "TABLE"
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
- DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident")
val table = catalog.getTableMetadata(tableName)
// This overrides old properties
val newTable = table.copy(properties = table.properties ++ properties)
@@ -264,10 +261,8 @@ case class AlterTableUnsetPropertiesCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- val ident = if (isView) "VIEW" else "TABLE"
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
- DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident")
val table = catalog.getTableMetadata(tableName)
if (!ifExists) {
propKeys.foreach { k =>
@@ -445,11 +440,11 @@ case class AlterTableRecoverPartitionsCommand(
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
}
- val table = catalog.getTableMetadata(tableName)
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on temporary tables: $tableName")
}
+ val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
@@ -458,7 +453,7 @@ case class AlterTableRecoverPartitionsCommand(
throw new AnalysisException(
s"Operation not allowed: $cmd only works on external tables: $tableName")
}
- if (!DDLUtils.isTablePartitioned(table)) {
+ if (table.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
}
@@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand(
object DDLUtils {
-
- def isDatasourceTable(props: Map[String, String]): Boolean = {
- props.contains(DATASOURCE_PROVIDER)
- }
-
def isDatasourceTable(table: CatalogTable): Boolean = {
- isDatasourceTable(table.properties)
+ table.provider.isDefined && table.provider.get != "hive"
}
/**
@@ -611,78 +601,4 @@ object DDLUtils {
case _ =>
})
}
-
- /**
- * If the given table properties (or SerDe properties) contains datasource properties,
- * throw an exception.
- */
- def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = {
- val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX))
- if (datasourceKeys.nonEmpty) {
- throw new AnalysisException(s"Operation not allowed: $operation property keys may not " +
- s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}")
- }
- }
-
- def isTablePartitioned(table: CatalogTable): Boolean = {
- table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
- }
-
- // A persisted data source table always store its schema in the catalog.
- def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
- require(isDatasourceTable(metadata))
- val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted."
- val props = metadata.properties
- props.get(DATASOURCE_SCHEMA).map { schema =>
- // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
- // After SPARK-6024, we removed this flag.
- // Although we are not using spark.sql.sources.schema any more, we need to still support.
- DataType.fromJson(schema).asInstanceOf[StructType]
- } getOrElse {
- props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
- val parts = (0 until numParts.toInt).map { index =>
- val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
- if (part == null) {
- throw new AnalysisException(msgSchemaCorrupted +
- s" (missing part $index of the schema, $numParts parts are expected).")
- }
- part
- }
- // Stick all parts back to a single schema string.
- DataType.fromJson(parts.mkString).asInstanceOf[StructType]
- } getOrElse(throw new AnalysisException(msgSchemaCorrupted))
- }
- }
-
- private def getColumnNamesByType(
- props: Map[String, String], colType: String, typeName: String): Seq[String] = {
- require(isDatasourceTable(props))
-
- for {
- numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
- index <- 0 until numCols.toInt
- } yield props.getOrElse(
- s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
- throw new AnalysisException(
- s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
- )
- )
- }
-
- def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
- getColumnNamesByType(metadata.properties, "part", "partitioning columns")
- }
-
- def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
- if (isDatasourceTable(metadata)) {
- metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
- BucketSpec(
- numBuckets.toInt,
- getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
- getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
- }
- } else {
- None
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 720399ecc5..af2b5ffd1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -119,11 +119,9 @@ case class CreateTableLikeCommand(
case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE")
sparkSession.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}
-
}
@@ -414,8 +412,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
describeSchema(catalog.lookupRelation(table).schema, result)
} else {
val metadata = catalog.getTableMetadata(table)
+ describeSchema(metadata.schema, result)
- describeSchema(metadata, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
@@ -429,20 +427,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- if (DDLUtils.isDatasourceTable(table)) {
- val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table)
- if (partColNames.nonEmpty) {
- val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table)
- append(buffer, "# Partition Information", "", "")
- append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
- describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer)
- }
- } else {
- if (table.partitionColumnNames.nonEmpty) {
- append(buffer, "# Partition Information", "", "")
- append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
- describeSchema(table.partitionSchema, buffer)
- }
+ if (table.partitionColumnNames.nonEmpty) {
+ append(buffer, "# Partition Information", "", "")
+ append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
+ describeSchema(table.partitionSchema, buffer)
}
}
@@ -466,11 +454,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
append(buffer, "Table Type:", table.tableType.name, "")
append(buffer, "Table Parameters:", "", "")
- table.properties.filterNot {
- // Hides schema properties that hold user-defined schema, partition columns, and bucketing
- // information since they are already extracted and shown in other parts.
- case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA)
- }.foreach { case (key, value) =>
+ table.properties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
@@ -493,7 +477,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
- def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match {
+ metadata.bucketSpec match {
case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
append(buffer, "Num Buckets:", numBuckets.toString, "")
append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
@@ -501,23 +485,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
case _ =>
}
-
- if (DDLUtils.isDatasourceTable(metadata)) {
- appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata))
- } else {
- appendBucketInfo(metadata.bucketSpec)
- }
- }
-
- private def describeSchema(
- tableDesc: CatalogTable,
- buffer: ArrayBuffer[Row]): Unit = {
- if (DDLUtils.isDatasourceTable(tableDesc)) {
- val schema = DDLUtils.getSchemaFromTableProperties(tableDesc)
- describeSchema(schema, buffer)
- } else {
- describeSchema(tableDesc.schema, buffer)
- }
}
private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
@@ -678,7 +645,7 @@ case class ShowPartitionsCommand(
s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}")
}
- if (!DDLUtils.isTablePartitioned(tab)) {
+ if (tab.partitionColumnNames.isEmpty) {
throw new AnalysisException(
s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}")
}
@@ -729,6 +696,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
val tableMetadata = catalog.getTableMetadata(table)
+ // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
showCreateDataSourceTable(tableMetadata)
} else {
@@ -872,15 +840,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showDataSourceTableDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
- val schema = DDLUtils.getSchemaFromTableProperties(metadata)
- val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
+ val columns = metadata.schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
builder ++= columns.mkString("(", ", ", ")\n")
}
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
val props = metadata.properties
- builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n"
+ builder ++= s"USING ${metadata.provider.get}\n"
val dataSourceOptions = metadata.storage.properties.filterNot {
case (key, value) =>
@@ -900,12 +867,12 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showDataSourceTableNonDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
- val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata)
+ val partCols = metadata.partitionColumnNames
if (partCols.nonEmpty) {
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
}
- DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec =>
+ metadata.bucketSpec.foreach { spec =>
if (spec.bucketColumnNames.nonEmpty) {
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 733ba18528..5eba7df060 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -204,24 +204,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
- val schema = DDLUtils.getSchemaFromTableProperties(table)
-
- // We only need names at here since userSpecifiedSchema we loaded from the metastore
- // contains partition columns. We can always get datatypes of partitioning columns
- // from userSpecifiedSchema.
- val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table)
-
- val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table)
-
- val options = table.storage.properties
val dataSource =
DataSource(
sparkSession,
- userSpecifiedSchema = Some(schema),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
- className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
- options = options)
+ userSpecifiedSchema = Some(table.schema),
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ className = table.provider.get,
+ options = table.storage.properties)
LogicalRelation(
dataSource.resolveRelation(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 447c237e3a..7880c7cfa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -48,6 +47,11 @@ private[datasources] case class WriteRelation(
prepareJobForWrite: Job => OutputWriterFactory,
bucketSpec: Option[BucketSpec])
+object WriterContainer {
+ val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
+ val DATASOURCE_OUTPUTPATH = "spark.sql.sources.output.path"
+}
+
private[datasources] abstract class BaseWriterContainer(
@transient val relation: WriteRelation,
@transient private val job: Job,
@@ -94,7 +98,7 @@ private[datasources] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
- job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
+ job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
@@ -244,7 +248,7 @@ private[datasources] class DefaultWriterContainer(
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
val configuration = taskAttemptContext.getConfiguration
- configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
+ configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath)
var writer = newOutputWriter(getWorkPath)
writer.initConverter(dataSchema)
@@ -352,10 +356,12 @@ private[datasources] class DynamicPartitionWriterContainer(
val configuration = taskAttemptContext.getConfiguration
val path = if (partitionColumns.nonEmpty) {
val partitionPath = getPartitionString(key).getString(0)
- configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString)
+ configuration.set(
+ WriterContainer.DATASOURCE_OUTPUTPATH,
+ new Path(outputPath, partitionPath).toString)
new Path(getWorkPath, partitionPath).toString
} else {
- configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
+ configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath)
getWorkPath
}
val bucketId = getBucketIdFromKey(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 6b2f9fc61e..de2d633c0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,8 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.types._
object CSVRelation extends Logging {
@@ -192,7 +191,7 @@ private[csv] class CsvOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+ val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 27910e2cdd..16150b91d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -164,7 +163,7 @@ private[json] class JsonOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+ val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9c4778acf5..9208c82179 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
@@ -547,8 +546,7 @@ private[parquet] class ParquetOutputWriter(
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(
- CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+ val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index abb6059f75..a0c3fd53fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
@@ -131,7 +130,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
+ val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index ad69137f74..52e648a917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -28,10 +28,9 @@ object HiveSerDe {
*
* @param source Currently the source abbreviation can be one of the following:
* SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
- * @param conf SQLConf
* @return HiveSerDe associated with the specified source
*/
- def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = {
+ def sourceToSerDe(source: String): Option[HiveSerDe] = {
val serdeMap = Map(
"sequencefile" ->
HiveSerDe(
@@ -42,8 +41,7 @@ object HiveSerDe {
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
- serde = Option(conf.getConfString("hive.default.rcfile.serde",
- "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))),
+ serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")),
"orc" ->
HiveSerDe(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index be1bccbd99..8dd883b37b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -243,7 +243,7 @@ class DDLCommandSuite extends PlanTest {
allSources.foreach { s =>
val query = s"CREATE TABLE my_tab STORED AS $s"
val ct = parseAs[CreateTable](query)
- val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+ val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
@@ -276,7 +276,7 @@ class DDLCommandSuite extends PlanTest {
val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTable](query)
- val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+ val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde == Some("anything"))
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
@@ -295,7 +295,7 @@ class DDLCommandSuite extends PlanTest {
val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
if (supportedSources.contains(s)) {
val ct = parseAs[CreateTable](query)
- val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+ val hiveSerde = HiveSerDe.sourceToSerDe(s)
assert(hiveSerde.isDefined)
assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0f7fda7666..e6ae42258d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -93,7 +92,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
- provider = Some("parquet"),
+ provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -277,10 +276,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
""".stripMargin)
val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
- assert(expectedSchema ==
- DDLUtils.getSchemaFromTableProperties(tableMetadata))
- assert(expectedPartitionCols ==
- DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata))
+ assert(expectedSchema == tableMetadata.schema)
+ assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
}
}
@@ -399,41 +396,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.message == "Found duplicate column(s) in bucket: a")
}
- test("Describe Table with Corrupted Schema") {
- import testImplicits._
-
- val tabName = "tab1"
- withTempPath { dir =>
- val path = dir.getCanonicalPath
- val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2")
- df.write.format("json").save(path)
- val uri = dir.toURI
-
- withTable(tabName) {
- sql(
- s"""
- |CREATE TABLE $tabName
- |USING json
- |OPTIONS (
- | path '$uri'
- |)
- """.stripMargin)
-
- val catalog = spark.sessionState.catalog
- val table = catalog.getTableMetadata(TableIdentifier(tabName))
- val newProperties = table.properties.filterKeys(key =>
- key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS)
- val newTable = table.copy(properties = newProperties)
- catalog.alterTable(newTable)
-
- val e = intercept[AnalysisException] {
- sql(s"DESC $tabName")
- }.getMessage
- assert(e.contains(s"Could not read schema from the metastore because it is corrupted"))
- }
- }
- }
-
test("Refresh table after changing the data source table partitioning") {
import testImplicits._
@@ -460,10 +422,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|)
""".stripMargin)
val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName))
- val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata)
- assert(tableSchema == schema)
- val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)
- assert(partCols == partitionCols)
+ assert(tableMetadata.schema == schema)
+ assert(tableMetadata.partitionColumnNames == partitionCols)
// Change the schema
val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
@@ -472,23 +432,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// No change on the schema
val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
- val tableSchemaBeforeRefresh =
- DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh)
- assert(tableSchemaBeforeRefresh == schema)
- val partColsBeforeRefresh =
- DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
- assert(partColsBeforeRefresh == partitionCols)
+ assert(tableMetadataBeforeRefresh.schema == schema)
+ assert(tableMetadataBeforeRefresh.partitionColumnNames == partitionCols)
// Refresh does not affect the schema
spark.catalog.refreshTable(tabName)
val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
- val tableSchemaAfterRefresh =
- DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
- assert(tableSchemaAfterRefresh == schema)
- val partColsAfterRefresh =
- DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
- assert(partColsAfterRefresh == partitionCols)
+ assert(tableMetadataAfterRefresh.schema == schema)
+ assert(tableMetadataAfterRefresh.partitionColumnNames == partitionCols)
}
}
}
@@ -641,7 +593,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema == new StructType().add("a", "int").add("b", "int"))
- assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
+ assert(table.provider == Some("parquet"))
}
}
@@ -651,12 +603,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
- assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
- assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
- assert(DDLUtils.getSchemaFromTableProperties(table) ==
- new StructType().add("a", IntegerType).add("b", IntegerType))
- assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
- Seq("a"))
+ assert(table.provider == Some("parquet"))
+ assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+ assert(table.partitionColumnNames == Seq("a"))
}
}
@@ -667,12 +616,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS")
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
- assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
- assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
- assert(DDLUtils.getSchemaFromTableProperties(table) ==
- new StructType().add("a", IntegerType).add("b", IntegerType))
- assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
- Some(BucketSpec(5, Seq("a"), Seq("b"))))
+ assert(table.provider == Some("parquet"))
+ assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+ assert(table.bucketSpec == Some(BucketSpec(5, Seq("a"), Seq("b"))))
}
}
@@ -1096,7 +1042,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog: SessionCatalog,
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
- properties = Map(DATASOURCE_PROVIDER -> "csv")))
+ provider = Some("csv")))
}
private def testSetProperties(isDatasourceTable: Boolean): Unit = {
@@ -1108,9 +1054,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
- !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
- }
+ catalog.getTableMetadata(tableIdent).properties
}
assert(getProps.isEmpty)
// set table properties
@@ -1124,11 +1068,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
}
- // datasource table property keys are not allowed
- val e = intercept[AnalysisException] {
- sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
- }
- assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
@@ -1140,9 +1079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
- catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
- !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
- }
+ catalog.getTableMetadata(tableIdent).properties
}
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
@@ -1164,11 +1101,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// property to unset does not exist, but "IF EXISTS" is specified
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
assert(getProps == Map("x" -> "y"))
- // datasource table property keys are not allowed
- val e2 = intercept[AnalysisException] {
- sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
- }
- assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
@@ -1573,10 +1505,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- test("create table with datasource properties (not allowed)") {
- assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
- }
-
test("Create Hive Table As Select") {
import testImplicits._
withTable("t", "t1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 49153f7736..729c9fdda5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -201,7 +201,7 @@ class CreateTableAsSelectSuite
""".stripMargin
)
val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a"))
+ assert(table.partitionColumnNames == Seq("a"))
}
}
@@ -217,8 +217,7 @@ class CreateTableAsSelectSuite
""".stripMargin
)
val table = catalog.getTableMetadata(TableIdentifier("t"))
- assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
- Option(BucketSpec(5, Seq("a"), Seq("b"))))
+ assert(table.bucketSpec == Option(BucketSpec(5, Seq("a"), Seq("b"))))
}
}