diff options
author | windpiger <songjun@outlook.com> | 2017-03-06 10:44:26 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-06 10:44:26 -0800 |
commit | 096df6d933c5326e5782aa8c5de842a0800eb369 (patch) | |
tree | a126f9307afbcac51f61b1b6038d541efac2a49c /sql/catalyst/src | |
parent | 46a64d1e0ae12c31e848f377a84fb28e3efb3699 (diff) | |
download | spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.gz spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.bz2 spark-096df6d933c5326e5782aa8c5de842a0800eb369.zip |
[SPARK-19257][SQL] location for table/partition/database should be java.net.URI
## What changes were proposed in this pull request?
Currently we treat the location of table/partition/database as URI string.
It will be safer if we can make the type of location as java.net.URI.
In this PR, there are following classes changes:
**1. CatalogDatabase**
```
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String])
--->
case class CatalogDatabase(
name: String,
description: String,
locationUri: URI,
properties: Map[String, String])
```
**2. CatalogStorageFormat**
```
case class CatalogStorageFormat(
locationUri: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
---->
case class CatalogStorageFormat(
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
```
Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally.
Here list some operation related location:
**1. whitespace in the location**
e.g. `/a/b c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b c/d`,
and the real path in the FileSystem also show `/a/b c/d`
**2. colon(:) in the location**
e.g. `/a/b:c/d`
For both table location and partition location,
when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` ,
**In linux file system**
`DESC EXTENDED t ` show the location is `/a/b:c/d`,
and the real path in the FileSystem also show `/a/b:c/d`
**in HDFS** throw exception:
`java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.`
**while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`,
and the real path in the FileSystem also show `/xxx/a=a%3Ab`
**3. percent sign(%) in the location**
e.g. `/a/b%c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%c/d`,
and the real path in the FileSystem also show `/a/b%c/d`
**4. encoded(%25) in the location**
e.g. `/a/b%25c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%25c/d`,
and the real path in the FileSystem also show `/a/b%25c/d`
**while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`,
and the real path in the FileSystem also show `/xxx/a=%2525`
**Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173)
### Summary:
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`,
the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ).
`DataBase` also have the same logic with `CREATE TABLE`
while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem`
In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString`
which transfrom `str to uri `or `uri to str`.
for example:
```
val str = '/a/b c/d'
val uri = new Path(str).toUri --> '/a/b%20c/d'
val strFromUri = new Path(uri).toString -> '/a/b c/d'
```
when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri `
## How was this patch tested?
unit test added.
The `current master branch` also `passed all the test cases` added in this PR by a litter change.
https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764
here `toURI` -> `toString` when test in master branch.
This can show that this PR is transparent for user.
Author: windpiger <songjun@outlook.com>
Closes #17149 from windpiger/changeStringToURI.
Diffstat (limited to 'sql/catalyst/src')
5 files changed, 56 insertions, 29 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 58ced549ba..a418edc302 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell @@ -162,6 +164,30 @@ object CatalogUtils { BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols) } + /** + * Convert URI to String. + * Since URI.toString does not decode the uri, e.g. change '%25' to '%'. + * Here we create a hadoop Path with the given URI, and rely on Path.toString + * to decode the uri + * @param uri the URI of the path + * @return the String of the path + */ + def URIToString(uri: URI): String = { + new Path(uri).toString + } + + /** + * Convert String to URI. + * Since new URI(string) does not encode string, e.g. change '%' to '%25'. + * Here we create a hadoop Path with the given String, and rely on Path.toUri + * to encode the string + * @param str the String of the path + * @return the URI of the path + */ + def stringToURI(str: String): URI = { + new Path(str).toUri + } + private def normalizeColumnName( tableName: String, tableCols: Seq[String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 340e8451f1..80aba4af94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -202,7 +202,7 @@ class InMemoryCatalog( tableDefinition.storage.locationUri.isEmpty val tableWithLocation = if (needDefaultTableLocation) { - val defaultTableLocation = new Path(catalog(db).db.locationUri, table) + val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table) try { val fs = defaultTableLocation.getFileSystem(hadoopConfig) fs.mkdirs(defaultTableLocation) @@ -211,7 +211,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to create table $table as failed " + s"to create its directory $defaultTableLocation", e) } - tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString)) + tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri)) } else { tableDefinition } @@ -274,7 +274,7 @@ class InMemoryCatalog( "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") val oldDir = new Path(oldDesc.table.location) - val newDir = new Path(catalog(db).db.locationUri, newName) + val newDir = new Path(new Path(catalog(db).db.locationUri), newName) try { val fs = oldDir.getFileSystem(hadoopConfig) fs.rename(oldDir, newDir) @@ -283,7 +283,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + s"to rename its directory $oldDir", e) } - oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString)) + oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri)) } catalog(db).tables.put(newName, oldDesc) @@ -389,7 +389,7 @@ class InMemoryCatalog( existingParts.put( p.spec, - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))) } } @@ -462,7 +462,7 @@ class InMemoryCatalog( } oldPartition.copy( spec = newSpec, - storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString))) + storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri))) } else { oldPartition.copy(spec = newSpec) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f6412e42c1..498bfbde9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -131,10 +132,10 @@ class SessionCatalog( * does not contain a scheme, this path will not be changed after the default * FileSystem is changed. */ - private def makeQualifiedPath(path: String): Path = { + private def makeQualifiedPath(path: URI): URI = { val hadoopPath = new Path(path) val fs = hadoopPath.getFileSystem(hadoopConf) - fs.makeQualified(hadoopPath) + fs.makeQualified(hadoopPath).toUri } private def requireDbExists(db: String): Unit = { @@ -170,7 +171,7 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) - val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri) externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), ignoreIfExists) @@ -228,9 +229,9 @@ class SessionCatalog( * Get the path for creating a non-default database when database location is not provided * by users. */ - def getDefaultDBPath(db: String): String = { + def getDefaultDBPath(db: String): URI = { val database = formatDatabaseName(db) - new Path(new Path(conf.warehousePath), database + ".db").toString + new Path(new Path(conf.warehousePath), database + ".db").toUri } // ---------------------------------------------------------------------------- @@ -351,11 +352,11 @@ class SessionCatalog( db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal) } - def defaultTablePath(tableIdent: TableIdentifier): String = { + def defaultTablePath(tableIdent: TableIdentifier): URI = { val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase)) val dbLocation = getDatabaseMetadata(dbName).locationUri - new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString + new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri } // ---------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 887caf07d1..4452c47987 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI import java.util.Date import com.google.common.base.Objects @@ -48,10 +49,7 @@ case class CatalogFunction( * Storage format, used to describe how a partition or a table is stored. */ case class CatalogStorageFormat( - // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must - // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and - // path.toUri respectively before use as a filesystem path due to URI char escaping. - locationUri: Option[String], + locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], @@ -105,7 +103,7 @@ case class CatalogTablePartition( } /** Return the partition location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: URI = storage.locationUri.getOrElse { val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") throw new AnalysisException(s"Partition [$specString] did not specify locationUri") } @@ -210,7 +208,7 @@ case class CatalogTable( } /** Return the table location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: URI = storage.locationUri.getOrElse { throw new AnalysisException(s"table $identifier did not specify locationUri") } @@ -241,7 +239,7 @@ case class CatalogTable( /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( - locationUri: Option[String] = storage.locationUri, + locationUri: Option[URI] = storage.locationUri, inputFormat: Option[String] = storage.inputFormat, outputFormat: Option[String] = storage.outputFormat, compressed: Boolean = false, @@ -337,7 +335,7 @@ object CatalogTableType { case class CatalogDatabase( name: String, description: String, - locationUri: String, + locationUri: URI, properties: Map[String, String]) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a5d399a065..07ccd68698 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac "db1", "tbl", Map("partCol1" -> "1", "partCol2" -> "2")).location - val tableLocation = catalog.getTable("db1", "tbl").location + val tableLocation = new Path(catalog.getTable("db1", "tbl").location) val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2") assert(new Path(partitionLocation) == defaultPartitionLocation) } @@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac partitionColumnNames = Seq("partCol1", "partCol2")) catalog.createTable(table, ignoreIfExists = false) - val tableLocation = catalog.getTable("db1", "tbl").location + val tableLocation = new Path(catalog.getTable("db1", "tbl").location) val mixedCasePart1 = CatalogTablePartition( Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) @@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac // File System operations // -------------------------------------------------------------------------- - private def exists(uri: String, children: String*): Boolean = { + private def exists(uri: URI, children: String*): Boolean = { val base = new Path(uri) val finalPath = children.foldLeft(base) { case (parent, child) => new Path(parent, child) @@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("external_table", Some("db1")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( - Some(Utils.createTempDir().getAbsolutePath), + Some(Utils.createTempDir().toURI), None, None, None, false, Map.empty), schema = new StructType().add("a", "int").add("b", "string"), provider = Some(defaultProvider) @@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithExistingDir = CatalogTablePartition( Map("partCol1" -> "7", "partCol2" -> "8"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) @@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithNonExistingDir = CatalogTablePartition( Map("partCol1" -> "9", "partCol2" -> "10"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) assert(tempPath.exists()) @@ -883,7 +885,7 @@ abstract class CatalogTestUtils { def newFunc(): CatalogFunction = newFunc("funcName") - def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/") + def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) def newDb(name: String): CatalogDatabase = { CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) @@ -895,7 +897,7 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)), + storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)), schema = new StructType() .add("col1", "int") .add("col2", "string") |