From b2074b664a9c269c4103760d40c4a14e7aeb1e83 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 21 Aug 2016 22:23:14 -0700 Subject: [SPARK-16498][SQL] move hive hack for data source table into HiveExternalCatalog ## What changes were proposed in this pull request? Spark SQL doesn't have its own meta store yet, and use hive's currently. However, hive's meta store has some limitations(e.g. columns can't be too many, not case-preserving, bad decimal type support, etc.), so we have some hacks to successfully store data source table metadata into hive meta store, i.e. put all the information in table properties. This PR moves these hacks to `HiveExternalCatalog`, tries to isolate hive specific logic in one place. changes overview: 1. **before this PR**: we need to put metadata(schema, partition columns, etc.) of data source tables to table properties before saving it to external catalog, even the external catalog doesn't use hive metastore(e.g. `InMemoryCatalog`) **after this PR**: the table properties tricks are only in `HiveExternalCatalog`, the caller side doesn't need to take care of it anymore. 2. **before this PR**: because the table properties tricks are done outside of external catalog, so we also need to revert these tricks when we read the table metadata from external catalog and use it. e.g. in `DescribeTableCommand` we will read schema and partition columns from table properties. **after this PR**: The table metadata read from external catalog is exactly the same with what we saved to it. bonus: now we can create data source table using `SessionCatalog`, if schema is specified. breaks: `schemaStringLengthThreshold` is not configurable anymore. `hive.default.rcfile.serde` is not configurable anymore. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #14155 from cloud-fan/catalog-table. --- .../spark/sql/execution/SparkSqlParser.scala | 4 +- .../execution/command/createDataSourceTables.scala | 255 ++------------------- .../apache/spark/sql/execution/command/ddl.scala | 94 +------- .../spark/sql/execution/command/tables.scala | 59 ++--- .../execution/datasources/DataSourceStrategy.scala | 22 +- .../execution/datasources/WriterContainer.scala | 16 +- .../execution/datasources/csv/CSVRelation.scala | 5 +- .../datasources/json/JsonFileFormat.scala | 3 +- .../datasources/parquet/ParquetFileFormat.scala | 4 +- .../datasources/text/TextFileFormat.scala | 3 +- .../org/apache/spark/sql/internal/HiveSerDe.scala | 6 +- .../sql/execution/command/DDLCommandSuite.scala | 6 +- .../spark/sql/execution/command/DDLSuite.scala | 110 ++------- .../sql/sources/CreateTableAsSelectSuite.scala | 5 +- 14 files changed, 92 insertions(+), 500 deletions(-) (limited to 'sql/core/src') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 71c3bd31e0..e32d30178e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -971,7 +971,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Storage format val defaultStorage: CatalogStorageFormat = { val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType) CatalogStorageFormat( locationUri = None, inputFormat = defaultHiveSerde.flatMap(_.inputFormat) @@ -1115,7 +1115,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitGenericFileFormat( ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { val source = ctx.identifier.getText - HiveSerDe.sourceToSerDe(source, conf) match { + HiveSerDe.sourceToSerDe(source) match { case Some(s) => CatalogStorageFormat.empty.copy( inputFormat = s.inputFormat, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 7b028e72ed..7400a0e7bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,10 +17,6 @@ package org.apache.spark.sql.execution.command -import scala.collection.mutable -import scala.util.control.NonFatal - -import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases @@ -28,7 +24,6 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} import org.apache.spark.sql.types._ @@ -97,16 +92,19 @@ case class CreateDataSourceTableCommand( } } - CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, + val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), schema = dataSource.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, - isExternal = isExternal) - + provider = Some(provider), + partitionColumnNames = partitionColumns, + bucketSpec = bucketSpec + ) + + // We will return Nil or throw exception at the beginning if the table already exists, so when + // we reach here, the table should not exist and we should set `ignoreIfExists` to false. + sessionState.catalog.createTable(table, ignoreIfExists = false) Seq.empty[Row] } } @@ -193,7 +191,7 @@ case class CreateDataSourceTableAsSelectCommand( } existingSchema = Some(l.schema) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - existingSchema = Some(DDLUtils.getSchemaFromTableProperties(s.metadata)) + existingSchema = Some(s.metadata.schema) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } @@ -233,15 +231,17 @@ case class CreateDataSourceTableAsSelectCommand( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - CreateDataSourceTableUtils.createDataSourceTable( - sparkSession = sparkSession, - tableIdent = tableIdent, - schema = result.schema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - provider = provider, - options = optionsWithPath, - isExternal = isExternal) + val schema = result.schema + val table = CatalogTable( + identifier = tableIdent, + tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath), + schema = schema, + provider = Some(provider), + partitionColumnNames = partitionColumns, + bucketSpec = bucketSpec + ) + sessionState.catalog.createTable(table, ignoreIfExists = false) } // Refresh the cache of the table in the catalog. @@ -249,210 +249,3 @@ case class CreateDataSourceTableAsSelectCommand( Seq.empty[Row] } } - - -object CreateDataSourceTableUtils extends Logging { - - val DATASOURCE_PREFIX = "spark.sql.sources." - val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" - val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" - val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" - val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" - val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" - val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" - val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" - val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" - val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" - val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." - val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." - val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." - val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - - def createDataSourceTable( - sparkSession: SparkSession, - tableIdent: TableIdentifier, - schema: StructType, - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], - provider: String, - options: Map[String, String], - isExternal: Boolean): Unit = { - val tableProperties = new mutable.HashMap[String, String] - tableProperties.put(DATASOURCE_PROVIDER, provider) - - // Serialized JSON schema string may be too long to be stored into a single metastore table - // property. In this case, we split the JSON string and store each part as a separate table - // property. - val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold - val schemaJsonString = schema.json - // Split the JSON string. - val parts = schemaJsonString.grouped(threshold).toSeq - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) - parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) - } - - if (partitionColumns.length > 0) { - tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) - partitionColumns.zipWithIndex.foreach { case (partCol, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) - } - } - - if (bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get - - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) - tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) - bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) - } - - if (sortColumnNames.nonEmpty) { - tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) - sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => - tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) - } - } - } - - val tableType = if (isExternal) { - tableProperties.put("EXTERNAL", "TRUE") - CatalogTableType.EXTERNAL - } else { - tableProperties.put("EXTERNAL", "FALSE") - CatalogTableType.MANAGED - } - - val maybeSerDe = HiveSerDe.sourceToSerDe(provider, sparkSession.sessionState.conf) - val dataSource = - DataSource( - sparkSession, - userSpecifiedSchema = Some(schema), - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = provider, - options = options) - - def newSparkSQLSpecificMetastoreTable(): CatalogTable = { - CatalogTable( - identifier = tableIdent, - tableType = tableType, - schema = new StructType, - provider = Some(provider), - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = options - ), - properties = tableProperties.toMap) - } - - def newHiveCompatibleMetastoreTable( - relation: HadoopFsRelation, - serde: HiveSerDe): CatalogTable = { - assert(partitionColumns.isEmpty) - assert(relation.partitionSchema.isEmpty) - - CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = CatalogStorageFormat( - locationUri = Some(relation.location.paths.map(_.toUri.toString).head), - inputFormat = serde.inputFormat, - outputFormat = serde.outputFormat, - serde = serde.serde, - compressed = false, - properties = options - ), - schema = relation.schema, - provider = Some(provider), - properties = tableProperties.toMap, - viewText = None) - } - - // TODO: Support persisting partitioned data source relations in Hive compatible format - val qualifiedTableName = tableIdent.quotedString - val skipHiveMetadata = options.getOrElse("skipHiveMetadata", "false").toBoolean - val resolvedRelation = dataSource.resolveRelation(checkPathExist = false) - val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) match { - case _ if skipHiveMetadata => - val message = - s"Persisting partitioned data source relation $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - - case (Some(serde), relation: HadoopFsRelation) if relation.location.paths.length == 1 && - relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty => - val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) - val message = - s"Persisting data source relation $qualifiedTableName with a single input path " + - s"into Hive metastore in Hive compatible format. Input path: " + - s"${relation.location.paths.head}." - (Some(hiveTable), message) - - case (Some(serde), relation: HadoopFsRelation) if relation.partitionSchema.nonEmpty => - val message = - s"Persisting partitioned data source relation $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + - "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") - (None, message) - - case (Some(serde), relation: HadoopFsRelation) if relation.bucketSpec.nonEmpty => - val message = - s"Persisting bucketed data source relation $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + - "Input path(s): " + relation.location.paths.mkString("\n", "\n", "") - (None, message) - - case (Some(serde), relation: HadoopFsRelation) => - val message = - s"Persisting data source relation $qualifiedTableName with multiple input paths into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + - s"Input paths: " + relation.location.paths.mkString("\n", "\n", "") - (None, message) - - case (Some(serde), _) => - val message = - s"Data source relation $qualifiedTableName is not a " + - s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " + - "in Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - - case _ => - val message = - s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + - s"Persisting data source relation $qualifiedTableName into Hive metastore in " + - s"Spark SQL specific format, which is NOT compatible with Hive." - (None, message) - } - - (hiveCompatibleTable, logMessage) match { - case (Some(table), message) => - // We first try to save the metadata of the table in a Hive compatible way. - // If Hive throws an error, we fall back to save its metadata in the Spark SQL - // specific way. - try { - logInfo(message) - sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) - } catch { - case NonFatal(e) => - val warningMessage = - s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " + - s"it into Hive metastore in Spark SQL specific format." - logWarning(warningMessage, e) - val table = newSparkSQLSpecificMetastoreTable() - sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) - } - - case (None, message) => - logWarning(message) - val table = newSparkSQLSpecificMetastoreTable() - sparkSession.sessionState.catalog.createTable(table, ignoreIfExists = false) - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2eff9337bc..3817f919f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -27,10 +27,9 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.CatalogTypes._ +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ @@ -234,10 +233,8 @@ case class AlterTableSetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) - DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident") val table = catalog.getTableMetadata(tableName) // This overrides old properties val newTable = table.copy(properties = table.properties ++ properties) @@ -264,10 +261,8 @@ case class AlterTableUnsetPropertiesCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val ident = if (isView) "VIEW" else "TABLE" val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, tableName, isView) - DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident") val table = catalog.getTableMetadata(tableName) if (!ifExists) { propKeys.foreach { k => @@ -445,11 +440,11 @@ case class AlterTableRecoverPartitionsCommand( if (!catalog.tableExists(tableName)) { throw new AnalysisException(s"Table $tableName in $cmd does not exist.") } - val table = catalog.getTableMetadata(tableName) if (catalog.isTemporaryTable(tableName)) { throw new AnalysisException( s"Operation not allowed: $cmd on temporary tables: $tableName") } + val table = catalog.getTableMetadata(tableName) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( s"Operation not allowed: $cmd on datasource tables: $tableName") @@ -458,7 +453,7 @@ case class AlterTableRecoverPartitionsCommand( throw new AnalysisException( s"Operation not allowed: $cmd only works on external tables: $tableName") } - if (!DDLUtils.isTablePartitioned(table)) { + if (table.partitionColumnNames.isEmpty) { throw new AnalysisException( s"Operation not allowed: $cmd only works on partitioned tables: $tableName") } @@ -584,13 +579,8 @@ case class AlterTableSetLocationCommand( object DDLUtils { - - def isDatasourceTable(props: Map[String, String]): Boolean = { - props.contains(DATASOURCE_PROVIDER) - } - def isDatasourceTable(table: CatalogTable): Boolean = { - isDatasourceTable(table.properties) + table.provider.isDefined && table.provider.get != "hive" } /** @@ -611,78 +601,4 @@ object DDLUtils { case _ => }) } - - /** - * If the given table properties (or SerDe properties) contains datasource properties, - * throw an exception. - */ - def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = { - val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX)) - if (datasourceKeys.nonEmpty) { - throw new AnalysisException(s"Operation not allowed: $operation property keys may not " + - s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}") - } - } - - def isTablePartitioned(table: CatalogTable): Boolean = { - table.partitionColumnNames.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) - } - - // A persisted data source table always store its schema in the catalog. - def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { - require(isDatasourceTable(metadata)) - val msgSchemaCorrupted = "Could not read schema from the metastore because it is corrupted." - val props = metadata.properties - props.get(DATASOURCE_SCHEMA).map { schema => - // Originally, we used spark.sql.sources.schema to store the schema of a data source table. - // After SPARK-6024, we removed this flag. - // Although we are not using spark.sql.sources.schema any more, we need to still support. - DataType.fromJson(schema).asInstanceOf[StructType] - } getOrElse { - props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => - val parts = (0 until numParts.toInt).map { index => - val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException(msgSchemaCorrupted + - s" (missing part $index of the schema, $numParts parts are expected).") - } - part - } - // Stick all parts back to a single schema string. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } getOrElse(throw new AnalysisException(msgSchemaCorrupted)) - } - } - - private def getColumnNamesByType( - props: Map[String, String], colType: String, typeName: String): Seq[String] = { - require(isDatasourceTable(props)) - - for { - numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq - index <- 0 until numCols.toInt - } yield props.getOrElse( - s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index", - throw new AnalysisException( - s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." - ) - ) - } - - def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { - getColumnNamesByType(metadata.properties, "part", "partitioning columns") - } - - def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = { - if (isDatasourceTable(metadata)) { - metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets => - BucketSpec( - numBuckets.toInt, - getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"), - getColumnNamesByType(metadata.properties, "sort", "sorting columns")) - } - } else { - None - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 720399ecc5..af2b5ffd1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -119,11 +119,9 @@ case class CreateTableLikeCommand( case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE") sparkSession.sessionState.catalog.createTable(table, ifNotExists) Seq.empty[Row] } - } @@ -414,8 +412,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF describeSchema(catalog.lookupRelation(table).schema, result) } else { val metadata = catalog.getTableMetadata(table) + describeSchema(metadata.schema, result) - describeSchema(metadata, result) if (isExtended) { describeExtended(metadata, result) } else if (isFormatted) { @@ -429,20 +427,10 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - if (DDLUtils.isDatasourceTable(table)) { - val partColNames = DDLUtils.getPartitionColumnsFromTableProperties(table) - if (partColNames.nonEmpty) { - val userSpecifiedSchema = DDLUtils.getSchemaFromTableProperties(table) - append(buffer, "# Partition Information", "", "") - append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) - describeSchema(StructType(partColNames.map(userSpecifiedSchema(_))), buffer) - } - } else { - if (table.partitionColumnNames.nonEmpty) { - append(buffer, "# Partition Information", "", "") - append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) - describeSchema(table.partitionSchema, buffer) - } + if (table.partitionColumnNames.nonEmpty) { + append(buffer, "# Partition Information", "", "") + append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) + describeSchema(table.partitionSchema, buffer) } } @@ -466,11 +454,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF append(buffer, "Table Type:", table.tableType.name, "") append(buffer, "Table Parameters:", "", "") - table.properties.filterNot { - // Hides schema properties that hold user-defined schema, partition columns, and bucketing - // information since they are already extracted and shown in other parts. - case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA) - }.foreach { case (key, value) => + table.properties.foreach { case (key, value) => append(buffer, s" $key", value, "") } @@ -493,7 +477,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - def appendBucketInfo(bucketSpec: Option[BucketSpec]) = bucketSpec match { + metadata.bucketSpec match { case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => append(buffer, "Num Buckets:", numBuckets.toString, "") append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "") @@ -501,23 +485,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF case _ => } - - if (DDLUtils.isDatasourceTable(metadata)) { - appendBucketInfo(DDLUtils.getBucketSpecFromTableProperties(metadata)) - } else { - appendBucketInfo(metadata.bucketSpec) - } - } - - private def describeSchema( - tableDesc: CatalogTable, - buffer: ArrayBuffer[Row]): Unit = { - if (DDLUtils.isDatasourceTable(tableDesc)) { - val schema = DDLUtils.getSchemaFromTableProperties(tableDesc) - describeSchema(schema, buffer) - } else { - describeSchema(tableDesc.schema, buffer) - } } private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { @@ -678,7 +645,7 @@ case class ShowPartitionsCommand( s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}") } - if (!DDLUtils.isTablePartitioned(tab)) { + if (tab.partitionColumnNames.isEmpty) { throw new AnalysisException( s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}") } @@ -729,6 +696,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val tableMetadata = catalog.getTableMetadata(table) + // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table. val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { showCreateDataSourceTable(tableMetadata) } else { @@ -872,15 +840,14 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showDataSourceTableDataColumns( metadata: CatalogTable, builder: StringBuilder): Unit = { - val schema = DDLUtils.getSchemaFromTableProperties(metadata) - val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") + val columns = metadata.schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") builder ++= columns.mkString("(", ", ", ")\n") } private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { val props = metadata.properties - builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n" + builder ++= s"USING ${metadata.provider.get}\n" val dataSourceOptions = metadata.storage.properties.filterNot { case (key, value) => @@ -900,12 +867,12 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showDataSourceTableNonDataColumns( metadata: CatalogTable, builder: StringBuilder): Unit = { - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata) + val partCols = metadata.partitionColumnNames if (partCols.nonEmpty) { builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n" } - DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec => + metadata.bucketSpec.foreach { spec => if (spec.bucketColumnNames.nonEmpty) { builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 733ba18528..5eba7df060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -204,24 +204,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { - val schema = DDLUtils.getSchemaFromTableProperties(table) - - // We only need names at here since userSpecifiedSchema we loaded from the metastore - // contains partition columns. We can always get datatypes of partitioning columns - // from userSpecifiedSchema. - val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(table) - - val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(table) - - val options = table.storage.properties val dataSource = DataSource( sparkSession, - userSpecifiedSchema = Some(schema), - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), - options = options) + userSpecifiedSchema = Some(table.schema), + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + className = table.provider.get, + options = table.storage.properties) LogicalRelation( dataSource.resolveRelation(), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 447c237e3a..7880c7cfa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.UnsafeKVExternalSorter -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -48,6 +47,11 @@ private[datasources] case class WriteRelation( prepareJobForWrite: Job => OutputWriterFactory, bucketSpec: Option[BucketSpec]) +object WriterContainer { + val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID" + val DATASOURCE_OUTPUTPATH = "spark.sql.sources.output.path" +} + private[datasources] abstract class BaseWriterContainer( @transient val relation: WriteRelation, @transient private val job: Job, @@ -94,7 +98,7 @@ private[datasources] abstract class BaseWriterContainer( // This UUID is sent to executor side together with the serialized `Configuration` object within // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate // unique task output files. - job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) + job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, @@ -244,7 +248,7 @@ private[datasources] class DefaultWriterContainer( def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) val configuration = taskAttemptContext.getConfiguration - configuration.set(DATASOURCE_OUTPUTPATH, outputPath) + configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath) var writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) @@ -352,10 +356,12 @@ private[datasources] class DynamicPartitionWriterContainer( val configuration = taskAttemptContext.getConfiguration val path = if (partitionColumns.nonEmpty) { val partitionPath = getPartitionString(key).getString(0) - configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString) + configuration.set( + WriterContainer.DATASOURCE_OUTPUTPATH, + new Path(outputPath, partitionPath).toString) new Path(getWorkPath, partitionPath).toString } else { - configuration.set(DATASOURCE_OUTPUTPATH, outputPath) + configuration.set(WriterContainer.DATASOURCE_OUTPUTPATH, outputPath) getWorkPath } val bucketId = getBucketIdFromKey(key) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 6b2f9fc61e..de2d633c0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -30,8 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils -import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer} import org.apache.spark.sql.types._ object CSVRelation extends Logging { @@ -192,7 +191,7 @@ private[csv] class CsvOutputWriter( new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) + val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 27910e2cdd..16150b91d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -164,7 +163,7 @@ private[json] class JsonOutputWriter( new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) + val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9c4778acf5..9208c82179 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf @@ -547,8 +546,7 @@ private[parquet] class ParquetOutputWriter( // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get( - CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) + val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index abb6059f75..a0c3fd53fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} @@ -131,7 +130,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) + val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index ad69137f74..52e648a917 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -28,10 +28,9 @@ object HiveSerDe { * * @param source Currently the source abbreviation can be one of the following: * SequenceFile, RCFile, ORC, PARQUET, and case insensitive. - * @param conf SQLConf * @return HiveSerDe associated with the specified source */ - def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = { + def sourceToSerDe(source: String): Option[HiveSerDe] = { val serdeMap = Map( "sequencefile" -> HiveSerDe( @@ -42,8 +41,7 @@ object HiveSerDe { HiveSerDe( inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option(conf.getConfString("hive.default.rcfile.serde", - "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))), + serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")), "orc" -> HiveSerDe( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index be1bccbd99..8dd883b37b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -243,7 +243,7 @@ class DDLCommandSuite extends PlanTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab STORED AS $s" val ct = parseAs[CreateTable](query) - val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) @@ -276,7 +276,7 @@ class DDLCommandSuite extends PlanTest { val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" if (supportedSources.contains(s)) { val ct = parseAs[CreateTable](query) - val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == Some("anything")) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) @@ -295,7 +295,7 @@ class DDLCommandSuite extends PlanTest { val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" if (supportedSources.contains(s)) { val ct = parseAs[CreateTable](query) - val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf) + val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == hiveSerde.get.serde) assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0f7fda7666..e6ae42258d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -93,7 +92,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .add("col2", "string") .add("a", "int") .add("b", "int"), - provider = Some("parquet"), + provider = Some("hive"), partitionColumnNames = Seq("a", "b"), createTime = 0L) } @@ -277,10 +276,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin) val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)) - assert(expectedSchema == - DDLUtils.getSchemaFromTableProperties(tableMetadata)) - assert(expectedPartitionCols == - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata)) + assert(expectedSchema == tableMetadata.schema) + assert(expectedPartitionCols == tableMetadata.partitionColumnNames) } } @@ -399,41 +396,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.message == "Found duplicate column(s) in bucket: a") } - test("Describe Table with Corrupted Schema") { - import testImplicits._ - - val tabName = "tab1" - withTempPath { dir => - val path = dir.getCanonicalPath - val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("col1", "col2") - df.write.format("json").save(path) - val uri = dir.toURI - - withTable(tabName) { - sql( - s""" - |CREATE TABLE $tabName - |USING json - |OPTIONS ( - | path '$uri' - |) - """.stripMargin) - - val catalog = spark.sessionState.catalog - val table = catalog.getTableMetadata(TableIdentifier(tabName)) - val newProperties = table.properties.filterKeys(key => - key != CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS) - val newTable = table.copy(properties = newProperties) - catalog.alterTable(newTable) - - val e = intercept[AnalysisException] { - sql(s"DESC $tabName") - }.getMessage - assert(e.contains(s"Could not read schema from the metastore because it is corrupted")) - } - } - } - test("Refresh table after changing the data source table partitioning") { import testImplicits._ @@ -460,10 +422,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |) """.stripMargin) val tableMetadata = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchema = DDLUtils.getSchemaFromTableProperties(tableMetadata) - assert(tableSchema == schema) - val partCols = DDLUtils.getPartitionColumnsFromTableProperties(tableMetadata) - assert(partCols == partitionCols) + assert(tableMetadata.schema == schema) + assert(tableMetadata.partitionColumnNames == partitionCols) // Change the schema val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) @@ -472,23 +432,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // No change on the schema val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchemaBeforeRefresh = - DDLUtils.getSchemaFromTableProperties(tableMetadataBeforeRefresh) - assert(tableSchemaBeforeRefresh == schema) - val partColsBeforeRefresh = - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh) - assert(partColsBeforeRefresh == partitionCols) + assert(tableMetadataBeforeRefresh.schema == schema) + assert(tableMetadataBeforeRefresh.partitionColumnNames == partitionCols) // Refresh does not affect the schema spark.catalog.refreshTable(tabName) val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName)) - val tableSchemaAfterRefresh = - DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh) - assert(tableSchemaAfterRefresh == schema) - val partColsAfterRefresh = - DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh) - assert(partColsAfterRefresh == partitionCols) + assert(tableMetadataAfterRefresh.schema == schema) + assert(tableMetadataAfterRefresh.partitionColumnNames == partitionCols) } } } @@ -641,7 +593,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) assert(table.schema == new StructType().add("a", "int").add("b", "int")) - assert(table.properties(DATASOURCE_PROVIDER) == "parquet") + assert(table.provider == Some("parquet")) } } @@ -651,12 +603,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE TABLE tbl(a INT, b INT) USING parquet PARTITIONED BY (a)") val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) - assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible - assert(table.properties(DATASOURCE_PROVIDER) == "parquet") - assert(DDLUtils.getSchemaFromTableProperties(table) == - new StructType().add("a", IntegerType).add("b", IntegerType)) - assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == - Seq("a")) + assert(table.provider == Some("parquet")) + assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType)) + assert(table.partitionColumnNames == Seq("a")) } } @@ -667,12 +616,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS") val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) - assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible - assert(table.properties(DATASOURCE_PROVIDER) == "parquet") - assert(DDLUtils.getSchemaFromTableProperties(table) == - new StructType().add("a", IntegerType).add("b", IntegerType)) - assert(DDLUtils.getBucketSpecFromTableProperties(table) == - Some(BucketSpec(5, Seq("a"), Seq("b")))) + assert(table.provider == Some("parquet")) + assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType)) + assert(table.bucketSpec == Some(BucketSpec(5, Seq("a"), Seq("b")))) } } @@ -1096,7 +1042,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog: SessionCatalog, tableIdent: TableIdentifier): Unit = { catalog.alterTable(catalog.getTableMetadata(tableIdent).copy( - properties = Map(DATASOURCE_PROVIDER -> "csv"))) + provider = Some("csv"))) } private def testSetProperties(isDatasourceTable: Boolean): Unit = { @@ -1108,9 +1054,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { convertToDatasourceTable(catalog, tableIdent) } def getProps: Map[String, String] = { - catalog.getTableMetadata(tableIdent).properties.filterKeys { k => - !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX) - } + catalog.getTableMetadata(tableIdent).properties } assert(getProps.isEmpty) // set table properties @@ -1124,11 +1068,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { intercept[AnalysisException] { sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')") } - // datasource table property keys are not allowed - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')") - } - assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo")) } private def testUnsetProperties(isDatasourceTable: Boolean): Unit = { @@ -1140,9 +1079,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { convertToDatasourceTable(catalog, tableIdent) } def getProps: Map[String, String] = { - catalog.getTableMetadata(tableIdent).properties.filterKeys { k => - !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX) - } + catalog.getTableMetadata(tableIdent).properties } // unset table properties sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')") @@ -1164,11 +1101,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // property to unset does not exist, but "IF EXISTS" is specified sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')") assert(getProps == Map("x" -> "y")) - // datasource table property keys are not allowed - val e2 = intercept[AnalysisException] { - sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')") - } - assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo")) } private def testSetLocation(isDatasourceTable: Boolean): Unit = { @@ -1573,10 +1505,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - test("create table with datasource properties (not allowed)") { - assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')") - } - test("Create Hive Table As Select") { import testImplicits._ withTable("t", "t1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 49153f7736..729c9fdda5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -201,7 +201,7 @@ class CreateTableAsSelectSuite """.stripMargin ) val table = catalog.getTableMetadata(TableIdentifier("t")) - assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == Seq("a")) + assert(table.partitionColumnNames == Seq("a")) } } @@ -217,8 +217,7 @@ class CreateTableAsSelectSuite """.stripMargin ) val table = catalog.getTableMetadata(TableIdentifier("t")) - assert(DDLUtils.getBucketSpecFromTableProperties(table) == - Option(BucketSpec(5, Seq("a"), Seq("b")))) + assert(table.bucketSpec == Option(BucketSpec(5, Seq("a"), Seq("b")))) } } -- cgit v1.2.3