diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-05-27 11:10:31 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-27 11:10:31 -0700 |
commit | 4538443e276597530a27c6922e48503677b13956 (patch) | |
tree | fb758a4f8676b6a93ff9d06c2b6e6441fbba0a9c /sql | |
parent | d24e251572d39a453293cabfe14e4aed25a55208 (diff) | |
download | spark-4538443e276597530a27c6922e48503677b13956.tar.gz spark-4538443e276597530a27c6922e48503677b13956.tar.bz2 spark-4538443e276597530a27c6922e48503677b13956.zip |
[SPARK-15584][SQL] Abstract duplicate code: `spark.sql.sources.` properties
## What changes were proposed in this pull request?
This PR replaces `spark.sql.sources.` strings with `CreateDataSourceTableUtils.*` constant variables.
## How was this patch tested?
Pass the existing Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13349 from dongjoon-hyun/SPARK-15584.
Diffstat (limited to 'sql')
15 files changed, 93 insertions, 93 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index deedb68a78..4b9aab612e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -256,15 +256,15 @@ case class CreateDataSourceTableAsSelectCommand( object CreateDataSourceTableUtils extends Logging { - // TODO: Actually replace usages with these variables (SPARK-15584) - val DATASOURCE_PREFIX = "spark.sql.sources." val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID" val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path" - val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema." + val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" + val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" + val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." @@ -296,7 +296,7 @@ object CreateDataSourceTableUtils extends Logging { options: Map[String, String], isExternal: Boolean): Unit = { val tableProperties = new mutable.HashMap[String, String] - tableProperties.put("spark.sql.sources.provider", provider) + tableProperties.put(DATASOURCE_PROVIDER, provider) // Saves optional user specified schema. Serialized JSON schema string may be too long to be // stored into a single metastore SerDe property. In this case, we split the JSON string and @@ -306,34 +306,32 @@ object CreateDataSourceTableUtils extends Logging { val schemaJsonString = schema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq - tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString) + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) parts.zipWithIndex.foreach { case (part, index) => - tableProperties.put(s"spark.sql.sources.schema.part.$index", part) + tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) } } if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) { - tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString) + tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) partitionColumns.zipWithIndex.foreach { case (partCol, index) => - tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol) + tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol) } } if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) { val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get - tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString) - tableProperties.put("spark.sql.sources.schema.numBucketCols", - bucketColumnNames.length.toString) + tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) + tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => - tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol) + tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol) } if (sortColumnNames.nonEmpty) { - tableProperties.put("spark.sql.sources.schema.numSortCols", - sortColumnNames.length.toString) + tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString) sortColumnNames.zipWithIndex.foreach { case (sortCol, index) => - tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol) + tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 15eba3b011..95bac94996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.types._ @@ -464,7 +464,7 @@ case class AlterTableSetLocationCommand( object DDLUtils { def isDatasourceTable(props: Map[String, String]): Boolean = { - props.contains("spark.sql.sources.provider") + props.contains(DATASOURCE_PROVIDER) } def isDatasourceTable(table: CatalogTable): Boolean = { @@ -503,8 +503,7 @@ object DDLUtils { } def isTablePartitioned(table: CatalogTable): Boolean = { - table.partitionColumns.nonEmpty || - table.properties.contains("spark.sql.sources.schema.numPartCols") + table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS) } // A persisted data source table may not store its schema in the catalog. In this case, its schema @@ -512,15 +511,15 @@ object DDLUtils { def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = { require(isDatasourceTable(metadata)) val props = metadata.properties - if (props.isDefinedAt("spark.sql.sources.schema")) { + if (props.isDefinedAt(DATASOURCE_SCHEMA)) { // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - props.get("spark.sql.sources.schema").map(DataType.fromJson(_).asInstanceOf[StructType]) + props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType]) } else { - metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts => + metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => val parts = (0 until numParts.toInt).map { index => - val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull + val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull if (part == null) { throw new AnalysisException( "Could not read schema from the metastore because it is corrupted " + @@ -543,7 +542,7 @@ object DDLUtils { numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq index <- 0 until numCols.toInt } yield props.getOrElse( - s"spark.sql.sources.schema.${colType}Col.$index", + s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index", throw new AnalysisException( s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." ) @@ -556,7 +555,7 @@ object DDLUtils { def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = { if (isDatasourceTable(metadata)) { - metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets => + metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets => BucketSpec( numBuckets.toInt, getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index d1024090d3..2d6a3b4860 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -443,7 +443,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF table.properties.filterNot { // Hides schema properties that hold user-defined schema, partition columns, and bucketing // information since they are already extracted and shown in other parts. - case (key, _) => key.startsWith("spark.sql.sources.schema") + case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA) }.foreach { case (key, value) => append(buffer, s" $key", value, "") } @@ -860,7 +860,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { val props = metadata.properties - builder ++= s"USING ${props("spark.sql.sources.provider")}\n" + builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n" val dataSourceOptions = metadata.storage.serdeProperties.filterNot { case (key, value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index a3d87cd38b..2b4786542c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -101,7 +101,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ userSpecifiedSchema = userSpecifiedSchema, partitionColumns = partitionColumns, bucketSpec = bucketSpec, - className = table.properties("spark.sql.sources.provider"), + className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER), options = options) LogicalRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 61dcbebd64..f56b50a543 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.UnsafeKVExternalSorter +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -91,7 +92,7 @@ private[sql] abstract class BaseWriterContainer( // This UUID is sent to executor side together with the serialized `Configuration` object within // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate // unique task output files. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, @@ -241,7 +242,7 @@ private[sql] class DefaultWriterContainer( def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) val configuration = taskAttemptContext.getConfiguration - configuration.set("spark.sql.sources.output.path", outputPath) + configuration.set(DATASOURCE_OUTPUTPATH, outputPath) var writer = newOutputWriter(getWorkPath) writer.initConverter(dataSchema) @@ -349,11 +350,10 @@ private[sql] class DynamicPartitionWriterContainer( val configuration = taskAttemptContext.getConfiguration val path = if (partitionColumns.nonEmpty) { val partitionPath = getPartitionString(key).getString(0) - configuration.set( - "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) + configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString) new Path(getWorkPath, partitionPath).toString } else { - configuration.set("spark.sql.sources.output.path", outputPath) + configuration.set(DATASOURCE_OUTPUTPATH, outputPath) getWorkPath } val bucketId = getBucketIdFromKey(key) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 9849484dce..d72c8b9ac2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile} import org.apache.spark.sql.types._ @@ -168,7 +169,7 @@ private[sql] class CsvOutputWriter( new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 35f247692f..c7c5281196 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,8 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -170,7 +169,7 @@ private[json] class JsonOutputWriter( new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b47d41e166..ff7962df22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -521,7 +522,8 @@ private[sql] class ParquetOutputWriter( // partitions in the case of dynamic partitioning. override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = configuration.get( + CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index d9525efe6d..1e5bce4a75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} @@ -119,7 +120,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e975756685..ccb4006483 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.BucketSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -387,7 +387,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int"))) - assert(table.properties("spark.sql.sources.provider") == "parquet") + assert(table.properties(DATASOURCE_PROVIDER) == "parquet") } } @@ -398,7 +398,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible - assert(table.properties("spark.sql.sources.provider") == "parquet") + assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == Some(new StructType().add("a", IntegerType).add("b", IntegerType))) assert(DDLUtils.getPartitionColumnsFromTableProperties(table) == @@ -414,7 +414,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible - assert(table.properties("spark.sql.sources.provider") == "parquet") + assert(table.properties(DATASOURCE_PROVIDER) == "parquet") assert(DDLUtils.getSchemaFromTableProperties(table) == Some(new StructType().add("a", IntegerType).add("b", IntegerType))) assert(DDLUtils.getBucketSpecFromTableProperties(table) == @@ -747,7 +747,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog: SessionCatalog, tableIdent: TableIdentifier): Unit = { catalog.alterTable(catalog.getTableMetadata(tableIdent).copy( - properties = Map("spark.sql.sources.provider" -> "csv"))) + properties = Map(DATASOURCE_PROVIDER -> "csv"))) } private def testSetProperties(isDatasourceTable: Boolean): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ea721e4d9b..ff395f39b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -74,9 +75,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable def schemaStringFromParts: Option[String] = { - table.properties.get("spark.sql.sources.schema.numParts").map { numParts => + table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => val parts = (0 until numParts.toInt).map { index => - val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull + val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull if (part == null) { throw new AnalysisException( "Could not read schema from the metastore because it is corrupted " + @@ -91,9 +92,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } def getColumnNames(colType: String): Seq[String] = { - table.properties.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").map { + table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map { numCols => (0 until numCols.toInt).map { index => - table.properties.getOrElse(s"spark.sql.sources.schema.${colType}Col.$index", + table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index", throw new AnalysisException( s"Could not read $colType columns from the metastore because it is corrupted " + s"(missing part $index of it, $numCols parts are expected).")) @@ -104,8 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. - val schemaString = - table.properties.get("spark.sql.sources.schema").orElse(schemaStringFromParts) + val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts) val userSpecifiedSchema = schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType]) @@ -115,7 +115,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // from userSpecifiedSchema. val partitionColumns = getColumnNames("part") - val bucketSpec = table.properties.get("spark.sql.sources.schema.numBuckets").map { n => + val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n => BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort")) } @@ -126,7 +126,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = userSpecifiedSchema, partitionColumns = partitionColumns, bucketSpec = bucketSpec, - className = table.properties("spark.sql.sources.provider"), + className = table.properties(DATASOURCE_PROVIDER), options = options) LogicalRelation( @@ -166,7 +166,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val qualifiedTableName = getQualifiedTableName(tableIdent) val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) - if (table.properties.get("spark.sql.sources.provider").isDefined) { + if (table.properties.get(DATASOURCE_PROVIDER).isDefined) { val dataSourceTable = cachedDataSourceTables(qualifiedTableName) val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable) // Then, if alias is specified, wrap the table with a Subquery using the alias. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index f1198179a0..0e8c37df88 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -36,6 +36,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} @@ -217,7 +218,7 @@ private[orc] class OrcOutputWriter( private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { recordWriterInstantiated = true - val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = conf.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val partition = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 1e6de463b3..2c50cc88cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -700,7 +700,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. - CreateDataSourceTableUtils.createDataSourceTable( + createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), @@ -737,8 +737,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), properties = Map( - "spark.sql.sources.provider" -> "json", - "spark.sql.sources.schema" -> schema.json, + DATASOURCE_PROVIDER -> "json", + DATASOURCE_SCHEMA -> schema.json, "EXTERNAL" -> "FALSE")) sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false) @@ -762,13 +762,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) - val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt + val numPartCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt assert(numPartCols == 2) val actualPartitionColumns = StructType( (0 until numPartCols).map { index => - df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index")) + df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index")) }) // Make sure partition columns are correctly stored in metastore. assert( @@ -798,19 +798,19 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) - val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt + val numBuckets = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt assert(numBuckets == 8) - val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt + val numBucketCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt assert(numBucketCols == 2) - val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt + val numSortCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt assert(numSortCols == 1) val actualBucketByColumns = StructType( (0 until numBucketCols).map { index => - df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index")) + df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index")) }) // Make sure bucketBy columns are correctly stored in metastore. assert( @@ -821,7 +821,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val actualSortByColumns = StructType( (0 until numSortCols).map { index => - df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index")) + df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index")) }) // Make sure sortBy columns are correctly stored in metastore. assert( @@ -913,7 +913,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTempDir { tempPath => val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - CreateDataSourceTableUtils.createDataSourceTable( + createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), @@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema .forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType)) - CreateDataSourceTableUtils.createDataSourceTable( + createDataSourceTable( sparkSession = spark, tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), @@ -960,10 +960,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv ) val metastoreTable = sharedState.externalCatalog.getTable("default", "t") - assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1) - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets")) - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols")) - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols")) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETS)) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETCOLS)) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS)) checkAnswer(table("t"), Row(2, 1)) } @@ -984,10 +984,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv ) val metastoreTable = sharedState.externalCatalog.getTable("default", "t") - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols")) - assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) - assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) - assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1) checkAnswer(table("t"), Row(1, 2)) } @@ -1006,10 +1006,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv ) val metastoreTable = sharedState.externalCatalog.getTable("default", "t") - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols")) - assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) - assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) - assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols")) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1) + assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS)) checkAnswer(table("t"), Row(1, 2)) } @@ -1031,10 +1031,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv ) val metastoreTable = sharedState.externalCatalog.getTable("default", "t") - assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1) - assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2) - assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1) - assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1) + assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1) checkAnswer(table("t"), Row(2, 3, 1)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 6f374d713b..741abcb751 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -101,24 +101,22 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("show tblproperties of data source tables - basic") { checkAnswer( - sql("SHOW TBLPROPERTIES parquet_tab1") - .filter(s"key = 'spark.sql.sources.provider'"), - Row("spark.sql.sources.provider", "org.apache.spark.sql.parquet.DefaultSource") :: Nil + sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_PROVIDER'"), + Row(DATASOURCE_PROVIDER, "org.apache.spark.sql.parquet.DefaultSource") :: Nil ) checkAnswer( - sql("SHOW TBLPROPERTIES parquet_tab1(spark.sql.sources.provider)"), + sql(s"SHOW TBLPROPERTIES parquet_tab1($DATASOURCE_PROVIDER)"), Row("org.apache.spark.sql.parquet.DefaultSource") :: Nil ) checkAnswer( - sql("SHOW TBLPROPERTIES parquet_tab1") - .filter(s"key = 'spark.sql.sources.schema.numParts'"), - Row("spark.sql.sources.schema.numParts", "1") :: Nil + sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_SCHEMA_NUMPARTS'"), + Row(DATASOURCE_SCHEMA_NUMPARTS, "1") :: Nil ) checkAnswer( - sql("SHOW TBLPROPERTIES parquet_tab1('spark.sql.sources.schema.numParts')"), + sql(s"SHOW TBLPROPERTIES parquet_tab1('$DATASOURCE_SCHEMA_NUMPARTS')"), Row("1")) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 0fa1841415..1fb777ade4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{sources, Row, SparkSession} import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -144,7 +145,7 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val uniqueWriteJobId = configuration.get(DATASOURCE_WRITEJOBUUID) val taskAttemptId = context.getTaskAttemptID val split = taskAttemptId.getTaskID.getId val name = FileOutputFormat.getOutputName(context) |