aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala6
9 files changed, 45 insertions, 23 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 65df688689..c106163741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -386,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
- val customLocation = storage.locationUri.orElse(location)
+ val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_)))
val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
@@ -1080,8 +1080,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
}
+
+ val locUri = location.map(CatalogUtils.stringToURI(_))
val storage = CatalogStorageFormat(
- locationUri = location,
+ locationUri = locUri,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
@@ -1132,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val newTableDesc = tableDesc.copy(
- storage = CatalogStorageFormat.empty.copy(locationUri = location),
+ storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d835b52116..3da66afced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.execution.command
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
- val pathOption = table.storage.locationUri.map("path" -> _)
+ val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
@@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand(
private def saveDataIntoTable(
session: SparkSession,
table: CatalogTable,
- tableLocation: Option[String],
+ tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
- val pathOption = tableLocation.map("path" -> _)
+ val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
className = table.provider.get,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 82cbb4aa47..b5c6042351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -66,7 +66,7 @@ case class CreateDatabaseCommand(
CatalogDatabase(
databaseName,
comment.getOrElse(""),
- path.getOrElse(catalog.getDefaultDBPath(databaseName)),
+ path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)),
props),
ifNotExists)
Seq.empty[Row]
@@ -146,7 +146,7 @@ case class DescribeDatabaseCommand(
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
- Row("Location", dbMetadata.locationUri) :: Nil
+ Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: Nil
if (extended) {
val properties =
@@ -426,7 +426,8 @@ case class AlterTableAddPartitionCommand(
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
- CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location))
+ CatalogTablePartition(normalizedSpec, table.storage.copy(
+ locationUri = location.map(CatalogUtils.stringToURI(_))))
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
@@ -710,7 +711,7 @@ case class AlterTableRecoverPartitionsCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
- table.storage.copy(locationUri = Some(location.toUri.toString)),
+ table.storage.copy(locationUri = Some(location.toUri)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
@@ -741,6 +742,7 @@ case class AlterTableSetLocationCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val locUri = CatalogUtils.stringToURI(location)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
@@ -748,11 +750,11 @@ case class AlterTableSetLocationCommand(
sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
- val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
+ val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
- catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
+ catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
}
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 3e80916104..86394ff23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -79,7 +79,8 @@ case class CreateTableLikeCommand(
CatalogTable(
identifier = targetTable,
tableType = tblType,
- storage = sourceTableDesc.storage.copy(locationUri = location),
+ storage = sourceTableDesc.storage.copy(
+ locationUri = location.map(CatalogUtils.stringToURI(_))),
schema = sourceTableDesc.schema,
provider = newProvider,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -495,7 +496,8 @@ case class DescribeTableCommand(
append(buffer, "Owner:", table.owner, "")
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
- append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
+ append(buffer, "Location:", table.storage.locationUri.map(CatalogUtils.URIToString(_))
+ .getOrElse(""), "")
append(buffer, "Table Type:", table.tableType.name, "")
table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
@@ -587,7 +589,8 @@ case class DescribeTableCommand(
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Table:", tableIdentifier.table, "")
- append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
+ append(buffer, "Location:", partition.storage.locationUri.map(CatalogUtils.URIToString(_))
+ .getOrElse(""), "")
append(buffer, "Partition Parameters:", "", "")
partition.parameters.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
@@ -953,7 +956,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
// when the table creation DDL contains the PATH option.
None
} else {
- Some(s"path '${escapeSingleQuotedString(location)}'")
+ Some(s"path '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 2068811661..d6c4b97ebd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.net.URI
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -46,7 +48,7 @@ class CatalogFileIndex(
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")
- private val baseLocation: Option[String] = table.storage.locationUri
+ private val baseLocation: Option[URI] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4947dfda6f..c9384e4425 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -597,6 +597,7 @@ object DataSource {
def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
val path = CaseInsensitiveMap(options).get("path")
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
- CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
+ CatalogStorageFormat.empty.copy(
+ locationUri = path.map(CatalogUtils.stringToURI(_)), properties = optionsWithoutPath)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f694a0d6d7..bddf5af23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -21,13 +21,15 @@ import java.util.concurrent.Callable
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -220,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
- val pathOption = table.storage.locationUri.map("path" -> _)
+ val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
DataSource(
sparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 3d9f41832b..ed07ff3ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.internal
import scala.reflect.runtime.universe.TypeTag
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
@@ -77,7 +79,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
new Database(
name = metadata.name,
description = metadata.description,
- locationUri = metadata.locationUri)
+ locationUri = CatalogUtils.URIToString(metadata.locationUri))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index bce84de45c..86129fa87f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
@@ -95,7 +96,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// Create the default database if it doesn't exist.
{
val defaultDbDefinition = CatalogDatabase(
- SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map())
+ SessionCatalog.DEFAULT_DATABASE,
+ "default database",
+ CatalogUtils.stringToURI(warehousePath),
+ Map())
// Initialize default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
// There may be another Spark application creating default database at the same time, here we