aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-03-06 10:44:26 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-06 10:44:26 -0800
commit096df6d933c5326e5782aa8c5de842a0800eb369 (patch)
treea126f9307afbcac51f61b1b6038d541efac2a49c /sql/catalyst
parent46a64d1e0ae12c31e848f377a84fb28e3efb3699 (diff)
downloadspark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.gz
spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.bz2
spark-096df6d933c5326e5782aa8c5de842a0800eb369.zip
[SPARK-19257][SQL] location for table/partition/database should be java.net.URI
## What changes were proposed in this pull request? Currently we treat the location of table/partition/database as URI string. It will be safer if we can make the type of location as java.net.URI. In this PR, there are following classes changes: **1. CatalogDatabase** ``` case class CatalogDatabase( name: String, description: String, locationUri: String, properties: Map[String, String]) ---> case class CatalogDatabase( name: String, description: String, locationUri: URI, properties: Map[String, String]) ``` **2. CatalogStorageFormat** ``` case class CatalogStorageFormat( locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ----> case class CatalogStorageFormat( locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ``` Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally. Here list some operation related location: **1. whitespace in the location** e.g. `/a/b c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` , then `DESC EXTENDED t ` show the location is `/a/b c/d`, and the real path in the FileSystem also show `/a/b c/d` **2. colon(:) in the location** e.g. `/a/b:c/d` For both table location and partition location, when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` , **In linux file system** `DESC EXTENDED t ` show the location is `/a/b:c/d`, and the real path in the FileSystem also show `/a/b:c/d` **in HDFS** throw exception: `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.` **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`, and the real path in the FileSystem also show `/xxx/a=a%3Ab` **3. percent sign(%) in the location** e.g. `/a/b%c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%c/d`, and the real path in the FileSystem also show `/a/b%c/d` **4. encoded(%25) in the location** e.g. `/a/b%25c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%25c/d`, and the real path in the FileSystem also show `/a/b%25c/d` **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`, and the real path in the FileSystem also show `/xxx/a=%2525` **Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173) ### Summary: After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`, the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ). `DataBase` also have the same logic with `CREATE TABLE` while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem` In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString` which transfrom `str to uri `or `uri to str`. for example: ``` val str = '/a/b c/d' val uri = new Path(str).toUri --> '/a/b%20c/d' val strFromUri = new Path(uri).toString -> '/a/b c/d' ``` when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri ` ## How was this patch tested? unit test added. The `current master branch` also `passed all the test cases` added in this PR by a litter change. https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764 here `toURI` -> `toString` when test in master branch. This can show that this PR is transparent for user. Author: windpiger <songjun@outlook.com> Closes #17149 from windpiger/changeStringToURI.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala18
5 files changed, 56 insertions, 29 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 58ced549ba..a418edc302 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell
@@ -162,6 +164,30 @@ object CatalogUtils {
BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
}
+ /**
+ * Convert URI to String.
+ * Since URI.toString does not decode the uri, e.g. change '%25' to '%'.
+ * Here we create a hadoop Path with the given URI, and rely on Path.toString
+ * to decode the uri
+ * @param uri the URI of the path
+ * @return the String of the path
+ */
+ def URIToString(uri: URI): String = {
+ new Path(uri).toString
+ }
+
+ /**
+ * Convert String to URI.
+ * Since new URI(string) does not encode string, e.g. change '%' to '%25'.
+ * Here we create a hadoop Path with the given String, and rely on Path.toUri
+ * to encode the string
+ * @param str the String of the path
+ * @return the URI of the path
+ */
+ def stringToURI(str: String): URI = {
+ new Path(str).toUri
+ }
+
private def normalizeColumnName(
tableName: String,
tableCols: Seq[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 340e8451f1..80aba4af94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -202,7 +202,7 @@ class InMemoryCatalog(
tableDefinition.storage.locationUri.isEmpty
val tableWithLocation = if (needDefaultTableLocation) {
- val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
+ val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
@@ -211,7 +211,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
- tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
+ tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}
@@ -274,7 +274,7 @@ class InMemoryCatalog(
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
- val newDir = new Path(catalog(db).db.locationUri, newName)
+ val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
@@ -283,7 +283,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
- oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
+ oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}
catalog(db).tables.put(newName, oldDesc)
@@ -389,7 +389,7 @@ class InMemoryCatalog(
existingParts.put(
p.spec,
- p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
+ p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
}
}
@@ -462,7 +462,7 @@ class InMemoryCatalog(
}
oldPartition.copy(
spec = newSpec,
- storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
+ storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
} else {
oldPartition.copy(spec = newSpec)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f6412e42c1..498bfbde9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -131,10 +132,10 @@ class SessionCatalog(
* does not contain a scheme, this path will not be changed after the default
* FileSystem is changed.
*/
- private def makeQualifiedPath(path: String): Path = {
+ private def makeQualifiedPath(path: URI): URI = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(hadoopConf)
- fs.makeQualified(hadoopPath)
+ fs.makeQualified(hadoopPath).toUri
}
private def requireDbExists(db: String): Unit = {
@@ -170,7 +171,7 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
- val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri)
externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
@@ -228,9 +229,9 @@ class SessionCatalog(
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
- def getDefaultDBPath(db: String): String = {
+ def getDefaultDBPath(db: String): URI = {
val database = formatDatabaseName(db)
- new Path(new Path(conf.warehousePath), database + ".db").toString
+ new Path(new Path(conf.warehousePath), database + ".db").toUri
}
// ----------------------------------------------------------------------------
@@ -351,11 +352,11 @@ class SessionCatalog(
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
}
- def defaultTablePath(tableIdent: TableIdentifier): String = {
+ def defaultTablePath(tableIdent: TableIdentifier): URI = {
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
val dbLocation = getDatabaseMetadata(dbName).locationUri
- new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
+ new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
}
// ----------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 887caf07d1..4452c47987 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
import java.util.Date
import com.google.common.base.Objects
@@ -48,10 +49,7 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
- // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
- // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
- // path.toUri respectively before use as a filesystem path due to URI char escaping.
- locationUri: Option[String],
+ locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
@@ -105,7 +103,7 @@ case class CatalogTablePartition(
}
/** Return the partition location, assuming it is specified. */
- def location: String = storage.locationUri.getOrElse {
+ def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
@@ -210,7 +208,7 @@ case class CatalogTable(
}
/** Return the table location, assuming it is specified. */
- def location: String = storage.locationUri.getOrElse {
+ def location: URI = storage.locationUri.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
}
@@ -241,7 +239,7 @@ case class CatalogTable(
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
- locationUri: Option[String] = storage.locationUri,
+ locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
@@ -337,7 +335,7 @@ object CatalogTableType {
case class CatalogDatabase(
name: String,
description: String,
- locationUri: String,
+ locationUri: URI,
properties: Map[String, String])
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index a5d399a065..07ccd68698 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.catalog
+import java.net.URI
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
@@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
"db1",
"tbl",
Map("partCol1" -> "1", "partCol2" -> "2")).location
- val tableLocation = catalog.getTable("db1", "tbl").location
+ val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
assert(new Path(partitionLocation) == defaultPartitionLocation)
}
@@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)
- val tableLocation = catalog.getTable("db1", "tbl").location
+ val tableLocation = new Path(catalog.getTable("db1", "tbl").location)
val mixedCasePart1 = CatalogTablePartition(
Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
@@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// File System operations
// --------------------------------------------------------------------------
- private def exists(uri: String, children: String*): Boolean = {
+ private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
@@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
- Some(Utils.createTempDir().getAbsolutePath),
+ Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
@@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithExistingDir = CatalogTablePartition(
Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
- Some(tempPath.toURI.toString),
+ Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)
@@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithNonExistingDir = CatalogTablePartition(
Map("partCol1" -> "9", "partCol2" -> "10"),
CatalogStorageFormat(
- Some(tempPath.toURI.toString),
+ Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
assert(tempPath.exists())
@@ -883,7 +885,7 @@ abstract class CatalogTestUtils {
def newFunc(): CatalogFunction = newFunc("funcName")
- def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/")
+ def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
def newDb(name: String): CatalogDatabase = {
CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
@@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
- storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
+ storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")