aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-06 10:44:26 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-06 10:44:26 -0800
commit096df6d933c5326e5782aa8c5de842a0800eb369 (patch)
treea126f9307afbcac51f61b1b6038d541efac2a49c
parent46a64d1e0ae12c31e848f377a84fb28e3efb3699 (diff)
downloadspark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.gz
spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.bz2
spark-096df6d933c5326e5782aa8c5de842a0800eb369.zip
[SPARK-19257][SQL] location for table/partition/database should be java.net.URI
## What changes were proposed in this pull request? Currently we treat the location of table/partition/database as URI string. It will be safer if we can make the type of location as java.net.URI. In this PR, there are following classes changes: **1. CatalogDatabase** ``` case class CatalogDatabase( name: String, description: String, locationUri: String, properties: Map[String, String]) ---> case class CatalogDatabase( name: String, description: String, locationUri: URI, properties: Map[String, String]) ``` **2. CatalogStorageFormat** ``` case class CatalogStorageFormat( locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ----> case class CatalogStorageFormat( locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ``` Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally. Here list some operation related location: **1. whitespace in the location** e.g. `/a/b c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` , then `DESC EXTENDED t ` show the location is `/a/b c/d`, and the real path in the FileSystem also show `/a/b c/d` **2. colon(:) in the location** e.g. `/a/b:c/d` For both table location and partition location, when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` , **In linux file system** `DESC EXTENDED t ` show the location is `/a/b:c/d`, and the real path in the FileSystem also show `/a/b:c/d` **in HDFS** throw exception: `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.` **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`, and the real path in the FileSystem also show `/xxx/a=a%3Ab` **3. percent sign(%) in the location** e.g. `/a/b%c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%c/d`, and the real path in the FileSystem also show `/a/b%c/d` **4. encoded(%25) in the location** e.g. `/a/b%25c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%25c/d`, and the real path in the FileSystem also show `/a/b%25c/d` **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`, and the real path in the FileSystem also show `/xxx/a=%2525` **Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173) ### Summary: After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`, the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ). `DataBase` also have the same logic with `CREATE TABLE` while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem` In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString` which transfrom `str to uri `or `uri to str`. for example: ``` val str = '/a/b c/d' val uri = new Path(str).toUri --> '/a/b%20c/d' val strFromUri = new Path(uri).toString -> '/a/b c/d' ``` when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri ` ## How was this patch tested? unit test added. The `current master branch` also `passed all the test cases` added in this PR by a litter change. https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764 here `toURI` -> `toString` when test in master branch. This can show that this PR is transparent for user. Author: windpiger <songjun@outlook.com> Closes #17149 from windpiger/changeStringToURI.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala136
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala23
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala171
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala4
33 files changed, 460 insertions, 155 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 58ced549ba..a418edc302 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell
@@ -162,6 +164,30 @@ object CatalogUtils {
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
}
+ /**
+ * Convert URI to String.
+ * Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
+ * Here we create a hadoop Path with the given URI, and rely on Path.toString
+ * to decode the uri
+ * @param uri the URI of the path
+ * @return the String of the path
+ */
+ def URIToString(uri: URI): String = {
+ new Path(uri).toString
+ }
+
+ /**
+ * Convert String to URI.
+ * Since new URI(string) does not encode string, e.g. change '%' to '%25'.
+ * Here we create a hadoop Path with the given String, and rely on Path.toUri
+ * to encode the string
+ * @param str the String of the path
+ * @return the URI of the path
+ */
+ def stringToURI(str: String): URI = {
+ new Path(str).toUri
+ }
+
private def normalizeColumnName(
tableName: String,
tableCols: Seq[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 340e8451f1..80aba4af94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -202,7 +202,7 @@ class InMemoryCatalog(
tableDefinition.storage.locationUri.isEmpty
val tableWithLocation = if (needDefaultTableLocation) {
- val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
+ val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
@@ -211,7 +211,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
- tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
+ tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}
@@ -274,7 +274,7 @@ class InMemoryCatalog(
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
- val newDir = new Path(catalog(db).db.locationUri, newName)
+ val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
@@ -283,7 +283,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
- oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
+ oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}
catalog(db).tables.put(newName, oldDesc)
@@ -389,7 +389,7 @@ class InMemoryCatalog(
existingParts.put(
p.spec,
- p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
+ p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
}
}
@@ -462,7 +462,7 @@ class InMemoryCatalog(
}
oldPartition.copy(
spec = newSpec,
- storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
+ storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
} else {
oldPartition.copy(spec = newSpec)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f6412e42c1..498bfbde9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -131,10 +132,10 @@ class SessionCatalog(
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
- private def makeQualifiedPath(path: String): Path = {
+ private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
- fs.makeQualified(hadoopPath)
+ fs.makeQualified(hadoopPath).toUri
}
private def requireDbExists(db: String): Unit = {
@@ -170,7 +171,7 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
- val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
@@ -228,9 +229,9 @@ class SessionCatalog(
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
- def getDefaultDBPath(db: String): String = {
+ def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
- new Path(new Path(conf.warehousePath), database + ".db").toString
+ new Path(new Path(conf.warehousePath), database + ".db").toUri
}
// ----------------------------------------------------------------------------
@@ -351,11 +352,11 @@ class SessionCatalog(
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
}
- def defaultTablePath(tableIdent: TableIdentifier): String = {
+ def defaultTablePath(tableIdent: TableIdentifier): URI = {
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
val dbLocation = getDatabaseMetadata(dbName).locationUri
- new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
+ new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
}
// ----------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 887caf07d1..4452c47987 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
import java.util.Date
import com.google.common.base.Objects
@@ -48,10 +49,7 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
- // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
- // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
- // path.toUri respectively before use as a filesystem path due to URI char escaping.
- locationUri: Option[String],
+ locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
@@ -105,7 +103,7 @@ case class CatalogTablePartition(
}
/** Return the partition location, assuming it is specified. */
- def location: String = storage.locationUri.getOrElse {
+ def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
@@ -210,7 +208,7 @@ case class CatalogTable(
}
/** Return the table location, assuming it is specified. */
- def location: String = storage.locationUri.getOrElse {
+ def location: URI = storage.locationUri.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
}
@@ -241,7 +239,7 @@ case class CatalogTable(
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
- locationUri: Option[String] = storage.locationUri,
+ locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
@@ -337,7 +335,7 @@ object CatalogTableType {
case class CatalogDatabase(
name: String,
description: String,
- locationUri: String,
+ locationUri: URI,
properties: Map[String, String])
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index a5d399a065..07ccd68698 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
@@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
"db1",
"tbl",
Map("partCol1" -> "1", "partCol2" -> "2")).location
- val tableLocation = catalog.getTable("db1", "tbl").location
+ val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
assert(new Path(partitionLocation) == defaultPartitionLocation)
}
@@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)
- val tableLocation = catalog.getTable("db1", "tbl").location
+ val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
val mixedCasePart1 = CatalogTablePartition(
Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
@@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// File System operations
// --------------------------------------------------------------------------
- private def exists(uri: String, children: String*): Boolean = {
+ private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
@@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
- Some(Utils.createTempDir().getAbsolutePath),
+ Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
@@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithExistingDir = CatalogTablePartition(
Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
- Some(tempPath.toURI.toString),
+ Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)
@@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithNonExistingDir = CatalogTablePartition(
Map("partCol1" -> "9", "partCol2" -> "10"),
CatalogStorageFormat(
- Some(tempPath.toURI.toString),
+ Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
assert(tempPath.exists())
@@ -883,7 +885,7 @@ abstract class CatalogTestUtils {
def newFunc(): CatalogFunction = newFunc("funcName")
- def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/")
+ def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
def newDb(name: String): CatalogDatabase = {
CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
@@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
- storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
+ storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 65df688689..c106163741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -386,7 +386,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
"LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " +
"you can only specify one of them.", ctx)
}
- val customLocation = storage.locationUri.orElse(location)
+ val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI(_)))
val tableType = if (customLocation.isDefined) {
CatalogTableType.EXTERNAL
@@ -1080,8 +1080,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
}
+
+ val locUri = location.map(CatalogUtils.stringToURI(_))
val storage = CatalogStorageFormat(
- locationUri = location,
+ locationUri = locUri,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
@@ -1132,7 +1134,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val newTableDesc = tableDesc.copy(
- storage = CatalogStorageFormat.empty.copy(locationUri = location),
+ storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d835b52116..3da66afced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.execution.command
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
- val pathOption = table.storage.locationUri.map("path" -> _)
+ val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
@@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand(
private def saveDataIntoTable(
session: SparkSession,
table: CatalogTable,
- tableLocation: Option[String],
+ tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
- val pathOption = tableLocation.map("path" -> _)
+ val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
session,
className = table.provider.get,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 82cbb4aa47..b5c6042351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -66,7 +66,7 @@ case class CreateDatabaseCommand(
CatalogDatabase(
databaseName,
comment.getOrElse(""),
- path.getOrElse(catalog.getDefaultDBPath(databaseName)),
+ path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)),
props),
ifNotExists)
Seq.empty[Row]
@@ -146,7 +146,7 @@ case class DescribeDatabaseCommand(
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
- Row("Location", dbMetadata.locationUri) :: Nil
+ Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: Nil
if (extended) {
val properties =
@@ -426,7 +426,8 @@ case class AlterTableAddPartitionCommand(
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
- CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location))
+ CatalogTablePartition(normalizedSpec, table.storage.copy(
+ locationUri = location.map(CatalogUtils.stringToURI(_))))
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
@@ -710,7 +711,7 @@ case class AlterTableRecoverPartitionsCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
- table.storage.copy(locationUri = Some(location.toUri.toString)),
+ table.storage.copy(locationUri = Some(location.toUri)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
@@ -741,6 +742,7 @@ case class AlterTableSetLocationCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
+ val locUri = CatalogUtils.stringToURI(location)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
@@ -748,11 +750,11 @@ case class AlterTableSetLocationCommand(
sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
- val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
+ val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri)))
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
- catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
+ catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
}
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 3e80916104..86394ff23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -79,7 +79,8 @@ case class CreateTableLikeCommand(
CatalogTable(
identifier = targetTable,
tableType = tblType,
- storage = sourceTableDesc.storage.copy(locationUri = location),
+ storage = sourceTableDesc.storage.copy(
+ locationUri = location.map(CatalogUtils.stringToURI(_))),
schema = sourceTableDesc.schema,
provider = newProvider,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -495,7 +496,8 @@ case class DescribeTableCommand(
append(buffer, "Owner:", table.owner, "")
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
- append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
+ append(buffer, "Location:", table.storage.locationUri.map(CatalogUtils.URIToString(_))
+ .getOrElse(""), "")
append(buffer, "Table Type:", table.tableType.name, "")
table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
@@ -587,7 +589,8 @@ case class DescribeTableCommand(
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Table:", tableIdentifier.table, "")
- append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
+ append(buffer, "Location:", partition.storage.locationUri.map(CatalogUtils.URIToString(_))
+ .getOrElse(""), "")
append(buffer, "Partition Parameters:", "", "")
partition.parameters.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
@@ -953,7 +956,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
// when the table creation DDL contains the PATH option.
None
} else {
- Some(s"path '${escapeSingleQuotedString(location)}'")
+ Some(s"path '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 2068811661..d6c4b97ebd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.net.URI
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -46,7 +48,7 @@ class CatalogFileIndex(
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")
- private val baseLocation: Option[String] = table.storage.locationUri
+ private val baseLocation: Option[URI] = table.storage.locationUri
override def partitionSchema: StructType = table.partitionSchema
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 4947dfda6f..c9384e4425 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -597,6 +597,7 @@ object DataSource {
def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
val path = CaseInsensitiveMap(options).get("path")
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
- CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
+ CatalogStorageFormat.empty.copy(
+ locationUri = path.map(CatalogUtils.stringToURI(_)), properties = optionsWithoutPath)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f694a0d6d7..bddf5af23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -21,13 +21,15 @@ import java.util.concurrent.Callable
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -220,7 +222,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
- val pathOption = table.storage.locationUri.map("path" -> _)
+ val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
DataSource(
sparkSession,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 3d9f41832b..ed07ff3ff0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.internal
import scala.reflect.runtime.universe.TypeTag
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
@@ -77,7 +79,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
new Database(
name = metadata.name,
description = metadata.description,
- locationUri = metadata.locationUri)
+ locationUri = CatalogUtils.URIToString(metadata.locationUri))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index bce84de45c..86129fa87f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
@@ -95,7 +96,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// Create the default database if it doesn't exist.
{
val defaultDbDefinition = CatalogDatabase(
- SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map())
+ SessionCatalog.DEFAULT_DATABASE,
+ "default database",
+ CatalogUtils.stringToURI(warehousePath),
+ Map())
// Initialize default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
// There may be another Spark application creating default database at the same time, here we
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 76bb9e5929..4b73b078da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.command
+import java.net.URI
+
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -317,7 +319,7 @@ class DDLCommandSuite extends PlanTest {
val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
val ct = parseAs[CreateTable](query)
assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
- assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
+ assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything")))
}
test("create hive table - property values must be set") {
@@ -334,7 +336,7 @@ class DDLCommandSuite extends PlanTest {
val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
val ct = parseAs[CreateTable](query)
assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
- assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
+ assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything")))
}
test("create table - with partitioned by") {
@@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest {
val expectedTableDesc = CatalogTable(
identifier = TableIdentifier("my_tab"),
tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")),
+ storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))),
schema = new StructType().add("a", IntegerType).add("b", StringType),
provider = Some("parquet"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 8b8cd0fdf4..6ffa58bcd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -26,9 +26,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -72,7 +70,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
catalog.createDatabase(
- CatalogDatabase(name, "", spark.sessionState.conf.warehousePath, Map()),
+ CatalogDatabase(
+ name, "", CatalogUtils.stringToURI(spark.sessionState.conf.warehousePath), Map()),
ignoreIfExists = false)
}
@@ -133,11 +132,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
- private def makeQualifiedPath(path: String): String = {
+ private def makeQualifiedPath(path: String): URI = {
// copy-paste from SessionCatalog
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration)
- fs.makeQualified(hadoopPath).toString
+ fs.makeQualified(hadoopPath).toUri
}
test("Create Database using Default Warehouse Path") {
@@ -449,7 +448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
- Row("Location", location) ::
+ Row("Location", CatalogUtils.URIToString(location)) ::
Row("Properties", "") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
@@ -458,7 +457,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
- Row("Location", location) ::
+ Row("Location", CatalogUtils.URIToString(location)) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@@ -467,7 +466,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
- Row("Location", location) ::
+ Row("Location", CatalogUtils.URIToString(location)) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
@@ -1094,7 +1093,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
// Verify that the location is set to the expected string
- def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
+ def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
.getOrElse { catalog.getTableMetadata(tableIdent).storage }
@@ -1111,17 +1110,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
- verifyLocation("/path/to/your/lovely/heart")
+ verifyLocation(new URI("/path/to/your/lovely/heart"))
// set table partition location
sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
- verifyLocation("/path/to/part/ways", Some(partSpec))
+ verifyLocation(new URI("/path/to/part/ways"), Some(partSpec))
// set table location without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
- verifyLocation("/swanky/steak/place")
+ verifyLocation(new URI("/swanky/steak/place"))
// set table partition location without explicitly specifying database
sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
- verifyLocation("vienna", Some(partSpec))
+ verifyLocation(new URI("vienna"), Some(partSpec))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
@@ -1255,7 +1254,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
"PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris")))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
// add partitions without explicitly specifying database
@@ -1819,7 +1818,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// SET LOCATION won't move data from previous table path to new table path.
assert(spark.table("tbl").count() == 0)
// the previous table path should be still there.
- assert(new File(new URI(defaultTablePath)).exists())
+ assert(new File(defaultTablePath).exists())
sql("INSERT INTO tbl SELECT 2")
checkAnswer(spark.table("tbl"), Row(2))
@@ -1843,28 +1842,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
dir.delete
- val tableLocFile = new File(table.location)
- assert(!tableLocFile.exists)
+ assert(!dir.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
- assert(tableLocFile.exists)
+ assert(dir.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
Utils.deleteRecursively(dir)
- assert(!tableLocFile.exists)
+ assert(!dir.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
- assert(tableLocFile.exists)
+ assert(dir.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
val newDirFile = new File(dir, "x")
- val newDir = newDirFile.toURI.toString
+ val newDir = newDirFile.getAbsolutePath
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table1.location == newDir)
+ assert(table1.location == new URI(newDir))
assert(!newDirFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
@@ -1885,7 +1883,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1911,13 +1909,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
dir.delete()
checkAnswer(spark.table("t"), Nil)
val newDirFile = new File(dir, "x")
- val newDir = newDirFile.toURI.toString
+ val newDir = newDirFile.toURI
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
@@ -1967,7 +1965,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -1986,7 +1984,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
@@ -1996,4 +1994,84 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
+
+ Seq("a b", "a:b", "a%b").foreach { specialChars =>
+ test(s"location uri contains $specialChars for datasource table") {
+ withTable("t", "t1") {
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string)
+ |USING parquet
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ spark.sql("INSERT INTO TABLE t SELECT 1")
+ assert(loc.listFiles().length >= 1)
+ checkAnswer(spark.table("t"), Row("1") :: Nil)
+ }
+
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a string, b string)
+ |USING parquet
+ |PARTITIONED BY(b)
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
+ val partFile = new File(loc, "b=2")
+ assert(partFile.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
+ val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
+ assert(!partFile1.exists())
+ val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
+ assert(partFile2.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
+ }
+ }
+ }
+ }
+
+ Seq("a b", "a:b", "a%b").foreach { specialChars =>
+ test(s"location uri contains $specialChars for database") {
+ try {
+ withTable("t") {
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
+ spark.sql("USE tmpdb")
+
+ import testImplicits._
+ Seq(1).toDF("a").write.saveAsTable("t")
+ val tblloc = new File(loc, "t")
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val tblPath = new Path(tblloc.getAbsolutePath)
+ val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(table.location == fs.makeQualified(tblPath).toUri)
+ assert(tblloc.listFiles().nonEmpty)
+ }
+ }
+ } finally {
+ spark.sql("DROP DATABASE IF EXISTS tmpdb")
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 75723d0abc..989a7f2698 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -459,7 +459,7 @@ class CatalogSuite
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
- assert(table.storage.locationUri.get == dir.getAbsolutePath)
+ assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
@@ -481,7 +481,7 @@ class CatalogSuite
options = Map.empty[String, String])
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.MANAGED)
- val tablePath = new File(new URI(table.storage.locationUri.get))
+ val tablePath = new File(table.storage.locationUri.get)
assert(tablePath.exists() && tablePath.listFiles().isEmpty)
Seq((1)).toDF("i").write.insertInto("t")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 9082261af7..93f3efe2cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -92,7 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
def tableDir: File = {
val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
- new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
+ new File(spark.sessionState.catalog.defaultTablePath(identifier))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index faf9afc49a..7ab339e005 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.sources
+import java.net.URI
+
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType}
@@ -78,7 +81,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
// should exist even path option is not specified when creating table
withTable("src") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
- assert(getPathOption("src") == Some(defaultTablePath("src")))
+ assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src"))))
}
}
@@ -105,7 +108,8 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|AS SELECT 1
""".stripMargin)
- assert(spark.table("src").schema.head.metadata.getString("path") == defaultTablePath("src"))
+ assert(spark.table("src").schema.head.metadata.getString("path") ==
+ CatalogUtils.URIToString(defaultTablePath("src")))
}
}
@@ -123,7 +127,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
withTable("src", "src2") {
sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
sql("ALTER TABLE src RENAME TO src2")
- assert(getPathOption("src2") == Some(defaultTablePath("src2")))
+ assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
}
}
@@ -133,7 +137,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
}.head
}
- private def defaultTablePath(tableName: String): String = {
+ private def defaultTablePath(tableName: String): URI = {
spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 43d9c2bec6..9ab4624594 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
tableDefinition.storage.locationUri.isEmpty
val tableLocation = if (needDefaultTableLocation) {
- Some(defaultTablePath(tableDefinition.identifier))
+ Some(CatalogUtils.stringToURI(defaultTablePath(tableDefinition.identifier)))
} else {
tableDefinition.storage.locationUri
}
@@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// However, in older version of Spark we already store table location in storage properties
// with key "path". Here we keep this behaviour for backward compatibility.
val storagePropsWithLocation = table.storage.properties ++
- table.storage.locationUri.map("path" -> _)
+ table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
// converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
// bucket specification to empty. Note that partition columns are retained, so that we can
@@ -285,7 +285,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// compatible format, which means the data source is file-based and must have a `path`.
require(table.storage.locationUri.isDefined,
"External file-based data source table must have a `path` entry in storage properties.")
- Some(new Path(table.location).toUri.toString)
+ Some(table.location)
} else {
None
}
@@ -432,13 +432,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
//
// Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details.
val tempPath = {
- val dbLocation = getDatabase(tableDefinition.database).locationUri
+ val dbLocation = new Path(getDatabase(tableDefinition.database).locationUri)
new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__")
}
try {
client.createTable(
- tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)),
+ tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)),
ignoreIfExists)
} finally {
FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
@@ -563,7 +563,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// want to alter the table location to a file path, we will fail. This should be fixed
// in the future.
- val newLocation = tableDefinition.storage.locationUri
+ val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_))
val storageWithPathOption = tableDefinition.storage.copy(
properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))
@@ -704,7 +704,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val storageWithLocation = {
val tableLocation = getLocationFromStorageProps(table)
// We pass None as `newPath` here, to remove the path option in storage properties.
- updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation)
+ updateLocationInStorageProps(table, newPath = None).copy(
+ locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
}
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
@@ -848,10 +849,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// However, Hive metastore is not case preserving and will generate wrong partition location
// with lower cased partition column names. Here we set the default partition location
// manually to avoid this problem.
- val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse {
+ val partitionPath = p.storage.locationUri.map(uri => new Path(uri)).getOrElse {
ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath)
}
- p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString)))
+ p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
@@ -890,7 +891,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
- partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
+ partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
}
alterPartitions(db, table, newParts)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 151a69aebf..4d3b6c3cec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -128,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table)
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
- val tablePath = new Path(new URI(relation.tableMeta.location))
+ val tablePath = new Path(relation.tableMeta.location)
val result = if (relation.isPartitioned) {
val partitionSchema = relation.tableMeta.partitionSchema
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
@@ -141,7 +141,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// locations,_omitting_ the table's base path.
val paths = sparkSession.sharedState.externalCatalog
.listPartitions(tableIdentifier.database, tableIdentifier.name)
- .map(p => new Path(new URI(p.storage.locationUri.get)))
+ .map(p => new Path(p.storage.locationUri.get))
if (paths.isEmpty) {
Seq(tablePath)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 624cfa206e..b5ce027d51 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -133,7 +133,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
} else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
- val tablePath = new Path(new URI(table.location))
+ val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 7acaa9a7ab..469c9d84de 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -317,7 +317,7 @@ private[hive] class HiveClientImpl(
new HiveDatabase(
database.name,
database.description,
- database.locationUri,
+ CatalogUtils.URIToString(database.locationUri),
Option(database.properties).map(_.asJava).orNull),
ignoreIfExists)
}
@@ -335,7 +335,7 @@ private[hive] class HiveClientImpl(
new HiveDatabase(
database.name,
database.description,
- database.locationUri,
+ CatalogUtils.URIToString(database.locationUri),
Option(database.properties).map(_.asJava).orNull))
}
@@ -344,7 +344,7 @@ private[hive] class HiveClientImpl(
CatalogDatabase(
name = d.getName,
description = d.getDescription,
- locationUri = d.getLocationUri,
+ locationUri = CatalogUtils.stringToURI(d.getLocationUri),
properties = Option(d.getParameters).map(_.asScala.toMap).orNull)
}.getOrElse(throw new NoSuchDatabaseException(dbName))
}
@@ -410,7 +410,7 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
- locationUri = shim.getDataLocation(h),
+ locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI(_)),
// To avoid ClassNotFound exception, we try our best to not get the format class, but get
// the class name directly. However, for non-native tables, there is no interface to get
// the format class name, so we may still throw ClassNotFound in this case.
@@ -851,7 +851,8 @@ private[hive] object HiveClientImpl {
conf.foreach(c => hiveTable.setOwner(c.getUser))
hiveTable.setCreateTime((table.createTime / 1000).toInt)
hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt)
- table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)}
+ table.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach { loc =>
+ hiveTable.getTTable.getSd.setLocation(loc)}
table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass)
table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass)
hiveTable.setSerializationLib(
@@ -885,7 +886,7 @@ private[hive] object HiveClientImpl {
}
val storageDesc = new StorageDescriptor
val serdeInfo = new SerDeInfo
- p.storage.locationUri.foreach(storageDesc.setLocation)
+ p.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach(storageDesc.setLocation)
p.storage.inputFormat.foreach(storageDesc.setInputFormat)
p.storage.outputFormat.foreach(storageDesc.setOutputFormat)
p.storage.serde.foreach(serdeInfo.setSerializationLib)
@@ -906,7 +907,7 @@ private[hive] object HiveClientImpl {
CatalogTablePartition(
spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
storage = CatalogStorageFormat(
- locationUri = Option(apiPartition.getSd.getLocation),
+ locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
inputFormat = Option(apiPartition.getSd.getInputFormat),
outputFormat = Option(apiPartition.getSd.getOutputFormat),
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 7280748361..c6188fc683 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -24,10 +24,9 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JS
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
-import scala.util.Try
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
@@ -41,7 +40,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegralType, StringType}
@@ -268,7 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
val table = hive.getTable(database, tableName)
parts.foreach { s =>
val location = s.storage.locationUri.map(
- uri => new Path(table.getPath, new Path(new URI(uri)))).orNull
+ uri => new Path(table.getPath, new Path(uri))).orNull
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
val spec = s.spec.asJava
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
@@ -463,7 +462,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
parts.zipWithIndex.foreach { case (s, i) =>
addPartitionDesc.addPartition(
- s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull)
+ s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull)
if (s.parameters.nonEmpty) {
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 6d7a1c3937..490e02d0bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.net.URI
+
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -70,7 +72,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.identifier.database == Some("mydb"))
assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL)
- assert(desc.storage.locationUri == Some("/user/external/page_view"))
+ assert(desc.storage.locationUri == Some(new URI("/user/external/page_view")))
assert(desc.schema.isEmpty) // will be populated later when the table is actually created
assert(desc.comment == Some("This is the staging page view table"))
// TODO will be SQLText
@@ -102,7 +104,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.identifier.database == Some("mydb"))
assert(desc.identifier.table == "page_view")
assert(desc.tableType == CatalogTableType.EXTERNAL)
- assert(desc.storage.locationUri == Some("/user/external/page_view"))
+ assert(desc.storage.locationUri == Some(new URI("/user/external/page_view")))
assert(desc.schema.isEmpty) // will be populated later when the table is actually created
// TODO will be SQLText
assert(desc.comment == Some("This is the staging page view table"))
@@ -338,7 +340,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'"
val (desc, _) = extractTableDesc(query)
assert(desc.tableType == CatalogTableType.EXTERNAL)
- assert(desc.storage.locationUri == Some("/path/to/nowhere"))
+ assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere")))
}
test("create table - if not exists") {
@@ -469,7 +471,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
assert(desc.viewText.isEmpty)
assert(desc.viewDefaultDatabase.isEmpty)
assert(desc.viewQueryColumnNames.isEmpty)
- assert(desc.storage.locationUri == Some("/path/to/mercury"))
+ assert(desc.storage.locationUri == Some(new URI("/path/to/mercury")))
assert(desc.storage.inputFormat == Some("winput"))
assert(desc.storage.outputFormat == Some("wowput"))
assert(desc.storage.serde == Some("org.apache.poof.serde.Baff"))
@@ -644,7 +646,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
.add("id", "int")
.add("name", "string", nullable = true, comment = "blabla"))
assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
- assert(table.storage.locationUri == Some("/tmp/file"))
+ assert(table.storage.locationUri == Some(new URI("/tmp/file")))
assert(table.storage.properties == Map("my_prop" -> "1"))
assert(table.comment == Some("BLABLA"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
index ee632d24b7..705d43f1f3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
@@ -40,7 +40,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
val tempDir = Utils.createTempDir().getCanonicalFile
- val tempDirUri = tempDir.toURI.toString.stripSuffix("/")
+ val tempDirUri = tempDir.toURI
+ val tempDirStr = tempDir.getAbsolutePath
override def beforeEach(): Unit = {
sql("CREATE DATABASE test_db")
@@ -59,9 +60,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
}
private def defaultTableURI(tableName: String): URI = {
- val defaultPath =
- spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
- new Path(defaultPath).toUri
+ spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
}
// Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
@@ -170,8 +169,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
identifier = TableIdentifier("tbl7", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"),
- properties = Map("path" -> tempDirUri)),
+ locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")),
+ properties = Map("path" -> tempDirStr)),
schema = new StructType(),
provider = Some("json"),
properties = Map(
@@ -184,7 +183,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
locationUri = Some(tempDirUri),
- properties = Map("path" -> tempDirUri)),
+ properties = Map("path" -> tempDirStr)),
schema = simpleSchema,
properties = Map(
"spark.sql.sources.provider" -> "parquet",
@@ -195,8 +194,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
identifier = TableIdentifier("tbl9", Some("test_db")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"),
- properties = Map("path" -> tempDirUri)),
+ locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")),
+ properties = Map("path" -> tempDirStr)),
schema = new StructType(),
provider = Some("json"),
properties = Map("spark.sql.sources.provider" -> "json"))
@@ -220,7 +219,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
if (tbl.tableType == CatalogTableType.EXTERNAL) {
// trim the URI prefix
- val tableLocation = new URI(readBack.storage.locationUri.get).getPath
+ val tableLocation = readBack.storage.locationUri.get.getPath
val expectedLocation = tempDir.toURI.getPath.stripSuffix("/")
assert(tableLocation == expectedLocation)
}
@@ -236,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
val readBack = getTableMetadata(tbl.identifier.table)
// trim the URI prefix
- val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
+ val actualTableLocation = readBack.storage.locationUri.get.getPath
val expected = dir.toURI.getPath.stripSuffix("/")
assert(actualTableLocation == expected)
}
@@ -252,7 +251,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
assert(readBack.schema.sameType(expectedSchema))
// trim the URI prefix
- val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
+ val actualTableLocation = readBack.storage.locationUri.get.getPath
val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
tempDir.toURI.getPath.stripSuffix("/")
} else {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 16cf4d7ec6..892a22ddfa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
+import java.net.URI
+
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -140,7 +142,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))
assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
- assert(hiveTable.storage.locationUri === Some(path.toString))
+ assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 8f0d5d886c..5f15a705a2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -485,7 +485,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
val expectedLocation =
- "file:" + expectedWarehouseLocation.toString + "/testlocation"
+ CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation")
val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
@@ -500,8 +500,8 @@ object SetWarehouseLocationTest extends Logging {
sparkSession.sql("create table testLocation (a int)")
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
- val expectedLocation =
- "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
+ val expectedLocation = CatalogUtils.stringToURI(
+ s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation")
val actualLocation = tableMetadata.location
if (actualLocation != expectedLocation) {
throw new Exception(
@@ -868,14 +868,16 @@ object SPARK_18360 {
val rawTable = hiveClient.getTable("default", "test_tbl")
// Hive will use the value of `hive.metastore.warehouse.dir` to generate default table
// location for tables in default database.
- assert(rawTable.storage.locationUri.get.contains(newWarehousePath))
+ assert(rawTable.storage.locationUri.map(
+ CatalogUtils.URIToString(_)).get.contains(newWarehousePath))
hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false)
spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false)
val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl")
// Spark SQL will use the location of default database to generate default table
// location for tables in default database.
- assert(readBack.storage.locationUri.get.contains(defaultDbLocation))
+ assert(readBack.storage.locationUri.map(CatalogUtils.URIToString(_))
+ .get.contains(defaultDbLocation))
} finally {
hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false)
hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 03ea0c8c77..f02b7218d6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1011,7 +1011,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
identifier = TableIdentifier("not_skip_hive_metadata"),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(
- locationUri = Some(tempPath.getCanonicalPath),
+ locationUri = Some(tempPath.toURI),
properties = Map("skipHiveMetadata" -> "false")
),
schema = schema,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 47ee4dd4d9..4aea6d14ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.hive
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -26,8 +30,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
private def checkTablePath(dbName: String, tableName: String): Unit = {
val metastoreTable = spark.sharedState.externalCatalog.getTable(dbName, tableName)
- val expectedPath =
- spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName
+ val expectedPath = new Path(new Path(
+ spark.sharedState.externalCatalog.getDatabase(dbName).locationUri), tableName).toUri
assert(metastoreTable.location === expectedPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index d61d10bf86..dd624eca6b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.client
import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -54,7 +55,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
test("success sanity check") {
val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
- val db = new CatalogDatabase("default", "desc", "loc", Map())
+ val db = new CatalogDatabase("default", "desc", new URI("loc"), Map())
badClient.createDatabase(db, ignoreIfExists = true)
}
@@ -125,10 +126,10 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
// Database related API
///////////////////////////////////////////////////////////////////////////
- val tempDatabasePath = Utils.createTempDir().getCanonicalPath
+ val tempDatabasePath = Utils.createTempDir().toURI
test(s"$version: createDatabase") {
- val defaultDB = CatalogDatabase("default", "desc", "loc", Map())
+ val defaultDB = CatalogDatabase("default", "desc", new URI("loc"), Map())
client.createDatabase(defaultDB, ignoreIfExists = true)
val tempDB = CatalogDatabase(
"temporary", description = "test create", tempDatabasePath, Map())
@@ -346,7 +347,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
test(s"$version: alterPartitions") {
val spec = Map("key1" -> "1", "key2" -> "2")
- val newLocation = Utils.createTempDir().getPath()
+ val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
val storage = storageFormat.copy(
locationUri = Some(newLocation),
// needed for 0.12 alter partitions
@@ -660,7 +661,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location.stripSuffix("/") == expectedPath)
+ assert(table.location == CatalogUtils.stringToURI(expectedPath))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)
@@ -669,7 +670,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
- assert(table1.location.stripSuffix("/") == expectedPath1)
+ assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 81ae5b7bdb..e956c9abae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.hive.execution
import java.io.File
+import java.lang.reflect.InvocationTargetException
+import java.net.URI
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
@@ -25,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -710,7 +712,7 @@ class HiveDDLSuite
}
sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'")
val db1 = catalog.getDatabaseMetadata(dbName)
- val dbPath = tmpDir.toURI.toString.stripSuffix("/")
+ val dbPath = new URI(tmpDir.toURI.toString.stripSuffix("/"))
assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty))
sql("USE db1")
@@ -747,11 +749,12 @@ class HiveDDLSuite
sql(s"CREATE DATABASE $dbName")
val catalog = spark.sessionState.catalog
val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db"
+ val expectedDBUri = CatalogUtils.stringToURI(expectedDBLocation)
val db1 = catalog.getDatabaseMetadata(dbName)
assert(db1 == CatalogDatabase(
dbName,
"",
- expectedDBLocation,
+ expectedDBUri,
Map.empty))
// the database directory was created
assert(fs.exists(dbPath) && fs.isDirectory(dbPath))
@@ -1606,7 +1609,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -1626,7 +1629,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == dir.getAbsolutePath)
+ assert(table.location == new URI(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
@@ -1686,4 +1689,162 @@ class HiveDDLSuite
}
}
}
+
+ Seq("a b", "a:b", "a%b").foreach { specialChars =>
+ test(s"datasource table: location uri contains $specialChars") {
+ withTable("t", "t1") {
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string)
+ |USING parquet
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ spark.sql("INSERT INTO TABLE t SELECT 1")
+ assert(loc.listFiles().length >= 1)
+ checkAnswer(spark.table("t"), Row("1") :: Nil)
+ }
+
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a string, b string)
+ |USING parquet
+ |PARTITIONED BY(b)
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
+ val partFile = new File(loc, "b=2")
+ assert(partFile.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
+ val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
+ assert(!partFile1.exists())
+ val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
+ assert(partFile2.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
+ }
+ }
+ }
+ }
+
+ Seq("a b", "a:b", "a%b").foreach { specialChars =>
+ test(s"hive table: location uri contains $specialChars") {
+ withTable("t") {
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string)
+ |USING hive
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val path = new Path(loc.getAbsolutePath)
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(table.location == fs.makeQualified(path).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ if (specialChars != "a:b") {
+ spark.sql("INSERT INTO TABLE t SELECT 1")
+ assert(loc.listFiles().length >= 1)
+ checkAnswer(spark.table("t"), Row("1") :: Nil)
+ } else {
+ val e = intercept[InvocationTargetException] {
+ spark.sql("INSERT INTO TABLE t SELECT 1")
+ }.getTargetException.getMessage
+ assert(e.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b"))
+ }
+ }
+
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ loc.mkdir()
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a string, b string)
+ |USING hive
+ |PARTITIONED BY(b)
+ |LOCATION '$loc'
+ """.stripMargin)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ val path = new Path(loc.getAbsolutePath)
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(table.location == fs.makeQualified(path).toUri)
+ assert(new Path(table.location).toString.contains(specialChars))
+
+ assert(loc.listFiles().isEmpty)
+ if (specialChars != "a:b") {
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
+ val partFile = new File(loc, "b=2")
+ assert(partFile.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
+
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
+ val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
+ assert(!partFile1.exists())
+ val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
+ assert(partFile2.listFiles().length >= 1)
+ checkAnswer(spark.table("t1"),
+ Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
+ } else {
+ val e = intercept[InvocationTargetException] {
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
+ }.getTargetException.getMessage
+ assert(e.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b"))
+
+ val e1 = intercept[InvocationTargetException] {
+ spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
+ }.getTargetException.getMessage
+ assert(e1.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b"))
+ }
+ }
+ }
+ }
+ }
+
+ Seq("a b", "a:b", "a%b").foreach { specialChars =>
+ test(s"location uri contains $specialChars for database") {
+ try {
+ withTable("t") {
+ withTempDir { dir =>
+ val loc = new File(dir, specialChars)
+ spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
+ spark.sql("USE tmpdb")
+
+ Seq(1).toDF("a").write.saveAsTable("t")
+ val tblloc = new File(loc, "t")
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ val tblPath = new Path(tblloc.getAbsolutePath)
+ val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(table.location == fs.makeQualified(tblPath).toUri)
+ assert(tblloc.listFiles().nonEmpty)
+ }
+ }
+ } finally {
+ spark.sql("DROP DATABASE IF EXISTS tmpdb")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ef2d451e6b..be9a5fd71b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -544,7 +544,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
userSpecifiedLocation match {
case Some(location) =>
- assert(r.tableMeta.location === location)
+ assert(r.tableMeta.location === CatalogUtils.stringToURI(location))
case None => // OK.
}
// Also make sure that the format and serde are as desired.