aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-05-27 11:10:31 -0700
committerAndrew Or <andrew@databricks.com>2016-05-27 11:10:31 -0700
commit4538443e276597530a27c6922e48503677b13956 (patch)
treefb758a4f8676b6a93ff9d06c2b6e6441fbba0a9c /sql
parentd24e251572d39a453293cabfe14e4aed25a55208 (diff)
downloadspark-4538443e276597530a27c6922e48503677b13956.tar.gz
spark-4538443e276597530a27c6922e48503677b13956.tar.bz2
spark-4538443e276597530a27c6922e48503677b13956.zip
[SPARK-15584][SQL] Abstract duplicate code: `spark.sql.sources.` properties
## What changes were proposed in this pull request? This PR replaces `spark.sql.sources.` strings with `CreateDataSourceTableUtils.*` constant variables. ## How was this patch tested? Pass the existing Jenkins tests. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13349 from dongjoon-hyun/SPARK-15584.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala58
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala3
15 files changed, 93 insertions, 93 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index deedb68a78..4b9aab612e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -256,15 +256,15 @@ case class CreateDataSourceTableAsSelectCommand(
object CreateDataSourceTableUtils extends Logging {
- // TODO: Actually replace usages with these variables (SPARK-15584)
-
val DATASOURCE_PREFIX = "spark.sql.sources."
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
- val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema."
+ val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
+ val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+ val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
@@ -296,7 +296,7 @@ object CreateDataSourceTableUtils extends Logging {
options: Map[String, String],
isExternal: Boolean): Unit = {
val tableProperties = new mutable.HashMap[String, String]
- tableProperties.put("spark.sql.sources.provider", provider)
+ tableProperties.put(DATASOURCE_PROVIDER, provider)
// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
@@ -306,34 +306,32 @@ object CreateDataSourceTableUtils extends Logging {
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
- tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
- tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
+ tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
}
if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
- tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol)
+ tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}
if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
- tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
- tableProperties.put("spark.sql.sources.schema.numBucketCols",
- bucketColumnNames.length.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString)
bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
+ tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", bucketCol)
}
if (sortColumnNames.nonEmpty) {
- tableProperties.put("spark.sql.sources.schema.numSortCols",
- sortColumnNames.length.toString)
+ tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, sortColumnNames.length.toString)
sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
- tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
+ tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", sortCol)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15eba3b011..95bac94996 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
@@ -464,7 +464,7 @@ case class AlterTableSetLocationCommand(
object DDLUtils {
def isDatasourceTable(props: Map[String, String]): Boolean = {
- props.contains("spark.sql.sources.provider")
+ props.contains(DATASOURCE_PROVIDER)
}
def isDatasourceTable(table: CatalogTable): Boolean = {
@@ -503,8 +503,7 @@ object DDLUtils {
}
def isTablePartitioned(table: CatalogTable): Boolean = {
- table.partitionColumns.nonEmpty ||
- table.properties.contains("spark.sql.sources.schema.numPartCols")
+ table.partitionColumns.nonEmpty || table.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS)
}
// A persisted data source table may not store its schema in the catalog. In this case, its schema
@@ -512,15 +511,15 @@ object DDLUtils {
def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
require(isDatasourceTable(metadata))
val props = metadata.properties
- if (props.isDefinedAt("spark.sql.sources.schema")) {
+ if (props.isDefinedAt(DATASOURCE_SCHEMA)) {
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using spark.sql.sources.schema any more, we need to still support.
- props.get("spark.sql.sources.schema").map(DataType.fromJson(_).asInstanceOf[StructType])
+ props.get(DATASOURCE_SCHEMA).map(DataType.fromJson(_).asInstanceOf[StructType])
} else {
- metadata.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
+ metadata.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
- val part = metadata.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+ val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(
"Could not read schema from the metastore because it is corrupted " +
@@ -543,7 +542,7 @@ object DDLUtils {
numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
index <- 0 until numCols.toInt
} yield props.getOrElse(
- s"spark.sql.sources.schema.${colType}Col.$index",
+ s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
throw new AnalysisException(
s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
)
@@ -556,7 +555,7 @@ object DDLUtils {
def getBucketSpecFromTableProperties(metadata: CatalogTable): Option[BucketSpec] = {
if (isDatasourceTable(metadata)) {
- metadata.properties.get("spark.sql.sources.schema.numBuckets").map { numBuckets =>
+ metadata.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { numBuckets =>
BucketSpec(
numBuckets.toInt,
getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d1024090d3..2d6a3b4860 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -443,7 +443,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
table.properties.filterNot {
// Hides schema properties that hold user-defined schema, partition columns, and bucketing
// information since they are already extracted and shown in other parts.
- case (key, _) => key.startsWith("spark.sql.sources.schema")
+ case (key, _) => key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA)
}.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
@@ -860,7 +860,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
val props = metadata.properties
- builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
+ builder ++= s"USING ${props(CreateDataSourceTableUtils.DATASOURCE_PROVIDER)}\n"
val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
case (key, value) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index a3d87cd38b..2b4786542c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -101,7 +101,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- className = table.properties("spark.sql.sources.provider"),
+ className = table.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
options = options)
LogicalRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 61dcbebd64..f56b50a543 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -91,7 +92,7 @@ private[sql] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+ job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
@@ -241,7 +242,7 @@ private[sql] class DefaultWriterContainer(
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
val configuration = taskAttemptContext.getConfiguration
- configuration.set("spark.sql.sources.output.path", outputPath)
+ configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
var writer = newOutputWriter(getWorkPath)
writer.initConverter(dataSchema)
@@ -349,11 +350,10 @@ private[sql] class DynamicPartitionWriterContainer(
val configuration = taskAttemptContext.getConfiguration
val path = if (partitionColumns.nonEmpty) {
val partitionPath = getPartitionString(key).getString(0)
- configuration.set(
- "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
+ configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString)
new Path(getWorkPath, partitionPath).toString
} else {
- configuration.set("spark.sql.sources.output.path", outputPath)
+ configuration.set(DATASOURCE_OUTPUTPATH, outputPath)
getWorkPath
}
val bucketId = getBucketIdFromKey(key)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 9849484dce..d72c8b9ac2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._
@@ -168,7 +169,7 @@ private[sql] class CsvOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 35f247692f..c7c5281196 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,8 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -170,7 +169,7 @@ private[json] class JsonOutputWriter(
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b47d41e166..ff7962df22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -521,7 +522,8 @@ private[sql] class ParquetOutputWriter(
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = configuration.get(
+ CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index d9525efe6d..1e5bce4a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
@@ -119,7 +120,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = configuration.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e975756685..ccb4006483 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -387,7 +387,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema == Seq(CatalogColumn("a", "int"), CatalogColumn("b", "int")))
- assert(table.properties("spark.sql.sources.provider") == "parquet")
+ assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
}
}
@@ -398,7 +398,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
- assert(table.properties("spark.sql.sources.provider") == "parquet")
+ assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
assert(DDLUtils.getPartitionColumnsFromTableProperties(table) ==
@@ -414,7 +414,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.schema.isEmpty) // partitioned datasource table is not hive-compatible
- assert(table.properties("spark.sql.sources.provider") == "parquet")
+ assert(table.properties(DATASOURCE_PROVIDER) == "parquet")
assert(DDLUtils.getSchemaFromTableProperties(table) ==
Some(new StructType().add("a", IntegerType).add("b", IntegerType)))
assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
@@ -747,7 +747,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog: SessionCatalog,
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
- properties = Map("spark.sql.sources.provider" -> "csv")))
+ properties = Map(DATASOURCE_PROVIDER -> "csv")))
}
private def testSetProperties(isDatasourceTable: Boolean): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ea721e4d9b..ff395f39b7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -74,9 +75,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable
def schemaStringFromParts: Option[String] = {
- table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
+ table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
- val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
+ val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(
"Could not read schema from the metastore because it is corrupted " +
@@ -91,9 +92,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
def getColumnNames(colType: String): Seq[String] = {
- table.properties.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").map {
+ table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
numCols => (0 until numCols.toInt).map { index =>
- table.properties.getOrElse(s"spark.sql.sources.schema.${colType}Col.$index",
+ table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
throw new AnalysisException(
s"Could not read $colType columns from the metastore because it is corrupted " +
s"(missing part $index of it, $numCols parts are expected)."))
@@ -104,8 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Originally, we used spark.sql.sources.schema to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using spark.sql.sources.schema any more, we need to still support.
- val schemaString =
- table.properties.get("spark.sql.sources.schema").orElse(schemaStringFromParts)
+ val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
val userSpecifiedSchema =
schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
@@ -115,7 +115,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// from userSpecifiedSchema.
val partitionColumns = getColumnNames("part")
- val bucketSpec = table.properties.get("spark.sql.sources.schema.numBuckets").map { n =>
+ val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort"))
}
@@ -126,7 +126,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- className = table.properties("spark.sql.sources.provider"),
+ className = table.properties(DATASOURCE_PROVIDER),
options = options)
LogicalRelation(
@@ -166,7 +166,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val qualifiedTableName = getQualifiedTableName(tableIdent)
val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name)
- if (table.properties.get("spark.sql.sources.provider").isDefined) {
+ if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index f1198179a0..0e8c37df88 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
@@ -217,7 +218,7 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = conf.get(CreateDataSourceTableUtils.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 1e6de463b3..2c50cc88cc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -700,7 +700,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
// Manually create a metastore data source table.
- CreateDataSourceTableUtils.createDataSourceTable(
+ createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
@@ -737,8 +737,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
"path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
),
properties = Map(
- "spark.sql.sources.provider" -> "json",
- "spark.sql.sources.schema" -> schema.json,
+ DATASOURCE_PROVIDER -> "json",
+ DATASOURCE_SCHEMA -> schema.json,
"EXTERNAL" -> "FALSE"))
sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)
@@ -762,13 +762,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
- val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
+ val numPartCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt
assert(numPartCols == 2)
val actualPartitionColumns =
StructType(
(0 until numPartCols).map { index =>
- df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index"))
+ df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index"))
})
// Make sure partition columns are correctly stored in metastore.
assert(
@@ -798,19 +798,19 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
- val numBuckets = metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt
+ val numBuckets = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt
assert(numBuckets == 8)
- val numBucketCols = metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt
+ val numBucketCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt
assert(numBucketCols == 2)
- val numSortCols = metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt
+ val numSortCols = metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt
assert(numSortCols == 1)
val actualBucketByColumns =
StructType(
(0 until numBucketCols).map { index =>
- df.schema(metastoreTable.properties(s"spark.sql.sources.schema.bucketCol.$index"))
+ df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index"))
})
// Make sure bucketBy columns are correctly stored in metastore.
assert(
@@ -821,7 +821,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val actualSortByColumns =
StructType(
(0 until numSortCols).map { index =>
- df.schema(metastoreTable.properties(s"spark.sql.sources.schema.sortCol.$index"))
+ df.schema(metastoreTable.properties(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index"))
})
// Make sure sortBy columns are correctly stored in metastore.
assert(
@@ -913,7 +913,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTempDir { tempPath =>
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
- CreateDataSourceTableUtils.createDataSourceTable(
+ createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
@@ -928,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
.forall(column => CatalystSqlParser.parseDataType(column.dataType) == StringType))
- CreateDataSourceTableUtils.createDataSourceTable(
+ createDataSourceTable(
sparkSession = spark,
tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
@@ -960,10 +960,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
- assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1)
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETS))
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMBUCKETCOLS))
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
checkAnswer(table("t"), Row(2, 1))
}
@@ -984,10 +984,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
- assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
- assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
- assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1)
checkAnswer(table("t"), Row(1, 2))
}
@@ -1006,10 +1006,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
- assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
- assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
- assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMPARTCOLS))
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+ assert(!metastoreTable.properties.contains(DATASOURCE_SCHEMA_NUMSORTCOLS))
checkAnswer(table("t"), Row(1, 2))
}
@@ -1031,10 +1031,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
)
val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
- assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
- assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
- assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
- assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMPARTCOLS).toInt === 1)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETS).toInt === 2)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMBUCKETCOLS).toInt === 1)
+ assert(metastoreTable.properties(DATASOURCE_SCHEMA_NUMSORTCOLS).toInt === 1)
checkAnswer(table("t"), Row(2, 3, 1))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 6f374d713b..741abcb751 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -101,24 +101,22 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
test("show tblproperties of data source tables - basic") {
checkAnswer(
- sql("SHOW TBLPROPERTIES parquet_tab1")
- .filter(s"key = 'spark.sql.sources.provider'"),
- Row("spark.sql.sources.provider", "org.apache.spark.sql.parquet.DefaultSource") :: Nil
+ sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_PROVIDER'"),
+ Row(DATASOURCE_PROVIDER, "org.apache.spark.sql.parquet.DefaultSource") :: Nil
)
checkAnswer(
- sql("SHOW TBLPROPERTIES parquet_tab1(spark.sql.sources.provider)"),
+ sql(s"SHOW TBLPROPERTIES parquet_tab1($DATASOURCE_PROVIDER)"),
Row("org.apache.spark.sql.parquet.DefaultSource") :: Nil
)
checkAnswer(
- sql("SHOW TBLPROPERTIES parquet_tab1")
- .filter(s"key = 'spark.sql.sources.schema.numParts'"),
- Row("spark.sql.sources.schema.numParts", "1") :: Nil
+ sql("SHOW TBLPROPERTIES parquet_tab1").filter(s"key = '$DATASOURCE_SCHEMA_NUMPARTS'"),
+ Row(DATASOURCE_SCHEMA_NUMPARTS, "1") :: Nil
)
checkAnswer(
- sql("SHOW TBLPROPERTIES parquet_tab1('spark.sql.sources.schema.numParts')"),
+ sql(s"SHOW TBLPROPERTIES parquet_tab1('$DATASOURCE_SCHEMA_NUMPARTS')"),
Row("1"))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0fa1841415..1fb777ade4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -144,7 +145,7 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val uniqueWriteJobId = configuration.get(DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)