diff options
Diffstat (limited to 'sql/core/src/main')
9 files changed, 45 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 65df688689..c106163741 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -386,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + "you can only specify one of them.", ctx) } - val customLocation = storage.locationUri.orElse(location) + val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_))) val tableType = if (customLocation.isDefined) { CatalogTableType.EXTERNAL @@ -1080,8 +1080,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (external && location.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) } + + val locUri = location.map(CatalogUtils.stringToURI(_)) val storage = CatalogStorageFormat( - locationUri = location, + locationUri = locUri, inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), @@ -1132,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. val newTableDesc = tableDesc.copy( - storage = CatalogStorageFormat.empty.copy(locationUri = location), + storage = CatalogStorageFormat.empty.copy(locationUri = locUri), provider = Some(conf.defaultDataSourceName)) CreateTable(newTableDesc, mode, Some(q)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index d835b52116..3da66afced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.execution.command +import java.net.URI + +import org.apache.hadoop.fs.Path + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) // Fill in some default table options from the session conf val tableWithDefaultOptions = table.copy( identifier = table.identifier.copy( @@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand( private def saveDataIntoTable( session: SparkSession, table: CatalogTable, - tableLocation: Option[String], + tableLocation: Option[URI], data: LogicalPlan, mode: SaveMode, tableExists: Boolean): BaseRelation = { // Create the relation based on the input logical plan: `data`. - val pathOption = tableLocation.map("path" -> _) + val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) val dataSource = DataSource( session, className = table.provider.get, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 82cbb4aa47..b5c6042351 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -66,7 +66,7 @@ case class CreateDatabaseCommand( CatalogDatabase( databaseName, comment.getOrElse(""), - path.getOrElse(catalog.getDefaultDBPath(databaseName)), + path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)), props), ifNotExists) Seq.empty[Row] @@ -146,7 +146,7 @@ case class DescribeDatabaseCommand( val result = Row("Database Name", dbMetadata.name) :: Row("Description", dbMetadata.description) :: - Row("Location", dbMetadata.locationUri) :: Nil + Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: Nil if (extended) { val properties = @@ -426,7 +426,8 @@ case class AlterTableAddPartitionCommand( table.identifier.quotedString, sparkSession.sessionState.conf.resolver) // inherit table storage format (possibly except for location) - CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location)) + CatalogTablePartition(normalizedSpec, table.storage.copy( + locationUri = location.map(CatalogUtils.stringToURI(_)))) } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) Seq.empty[Row] @@ -710,7 +711,7 @@ case class AlterTableRecoverPartitionsCommand( // inherit table storage format (possibly except for location) CatalogTablePartition( spec, - table.storage.copy(locationUri = Some(location.toUri.toString)), + table.storage.copy(locationUri = Some(location.toUri)), params) } spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) @@ -741,6 +742,7 @@ case class AlterTableSetLocationCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) + val locUri = CatalogUtils.stringToURI(location) DDLUtils.verifyAlterTableType(catalog, table, isView = false) partitionSpec match { case Some(spec) => @@ -748,11 +750,11 @@ case class AlterTableSetLocationCommand( sparkSession, table, "ALTER TABLE ... SET LOCATION") // Partition spec is specified, so we set the location only for this partition val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location))) + val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri))) catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(location))) + catalog.alterTable(table.withNewStorage(locationUri = Some(locUri))) } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 3e80916104..86394ff23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -79,7 +79,8 @@ case class CreateTableLikeCommand( CatalogTable( identifier = targetTable, tableType = tblType, - storage = sourceTableDesc.storage.copy(locationUri = location), + storage = sourceTableDesc.storage.copy( + locationUri = location.map(CatalogUtils.stringToURI(_))), schema = sourceTableDesc.schema, provider = newProvider, partitionColumnNames = sourceTableDesc.partitionColumnNames, @@ -495,7 +496,8 @@ case class DescribeTableCommand( append(buffer, "Owner:", table.owner, "") append(buffer, "Create Time:", new Date(table.createTime).toString, "") append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") - append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", table.storage.locationUri.map(CatalogUtils.URIToString(_)) + .getOrElse(""), "") append(buffer, "Table Type:", table.tableType.name, "") table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) @@ -587,7 +589,8 @@ case class DescribeTableCommand( append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") append(buffer, "Database:", table.database, "") append(buffer, "Table:", tableIdentifier.table, "") - append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", partition.storage.locationUri.map(CatalogUtils.URIToString(_)) + .getOrElse(""), "") append(buffer, "Partition Parameters:", "", "") partition.parameters.foreach { case (key, value) => append(buffer, s" $key", value, "") @@ -953,7 +956,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman // when the table creation DDL contains the PATH option. None } else { - Some(s"path '${escapeSingleQuotedString(location)}'") + Some(s"path '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 2068811661..d6c4b97ebd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -46,7 +48,7 @@ class CatalogFileIndex( assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFileIndex") - private val baseLocation: Option[String] = table.storage.locationUri + private val baseLocation: Option[URI] = table.storage.locationUri override def partitionSchema: StructType = table.partitionSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 4947dfda6f..c9384e4425 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat @@ -597,6 +597,7 @@ object DataSource { def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") - CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath) + CatalogStorageFormat.empty.copy( + locationUri = path.map(CatalogUtils.stringToURI(_)), properties = optionsWithoutPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f694a0d6d7..bddf5af23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -21,13 +21,15 @@ import java.util.concurrent.Callable import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -220,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 3d9f41832b..ed07ff3ff0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.internal import scala.reflect.runtime.universe.TypeTag +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} @@ -77,7 +79,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { new Database( name = metadata.name, description = metadata.description, - locationUri = metadata.locationUri) + locationUri = CatalogUtils.URIToString(metadata.locationUri)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index bce84de45c..86129fa87f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging @@ -95,7 +96,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // Create the default database if it doesn't exist. { val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map()) + SessionCatalog.DEFAULT_DATABASE, + "default database", + CatalogUtils.stringToURI(warehousePath), + Map()) // Initialize default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we |