From 096df6d933c5326e5782aa8c5de842a0800eb369 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 10:44:26 -0800 Subject: [SPARK-19257][SQL] location for table/partition/database should be java.net.URI ## What changes were proposed in this pull request? Currently we treat the location of table/partition/database as URI string. It will be safer if we can make the type of location as java.net.URI. In this PR, there are following classes changes: **1. CatalogDatabase** ``` case class CatalogDatabase( name: String, description: String, locationUri: String, properties: Map[String, String]) ---> case class CatalogDatabase( name: String, description: String, locationUri: URI, properties: Map[String, String]) ``` **2. CatalogStorageFormat** ``` case class CatalogStorageFormat( locationUri: Option[String], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ----> case class CatalogStorageFormat( locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) ``` Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally. Here list some operation related location: **1. whitespace in the location** e.g. `/a/b c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` , then `DESC EXTENDED t ` show the location is `/a/b c/d`, and the real path in the FileSystem also show `/a/b c/d` **2. colon(:) in the location** e.g. `/a/b:c/d` For both table location and partition location, when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` , **In linux file system** `DESC EXTENDED t ` show the location is `/a/b:c/d`, and the real path in the FileSystem also show `/a/b:c/d` **in HDFS** throw exception: `java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.` **while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`, and the real path in the FileSystem also show `/xxx/a=a%3Ab` **3. percent sign(%) in the location** e.g. `/a/b%c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%c/d`, and the real path in the FileSystem also show `/a/b%c/d` **4. encoded(%25) in the location** e.g. `/a/b%25c/d` For both table location and partition location, After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` , then `DESC EXTENDED t ` show the location is `/a/b%25c/d`, and the real path in the FileSystem also show `/a/b%25c/d` **while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1` then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`, and the real path in the FileSystem also show `/xxx/a=%2525` **Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173) ### Summary: After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`, the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ). `DataBase` also have the same logic with `CREATE TABLE` while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem` In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString` which transfrom `str to uri `or `uri to str`. for example: ``` val str = '/a/b c/d' val uri = new Path(str).toUri --> '/a/b%20c/d' val strFromUri = new Path(uri).toString -> '/a/b c/d' ``` when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri ` ## How was this patch tested? unit test added. The `current master branch` also `passed all the test cases` added in this PR by a litter change. https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764 here `toURI` -> `toString` when test in master branch. This can show that this PR is transparent for user. Author: windpiger Closes #17149 from windpiger/changeStringToURI. --- .../spark/sql/hive/HiveExternalCatalog.scala | 21 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +- .../apache/spark/sql/hive/client/HiveShim.scala | 9 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 12 +- ...ExternalCatalogBackwardCompatibilitySuite.scala | 23 ++- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 4 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 12 +- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../apache/spark/sql/hive/MultiDatabaseSuite.scala | 8 +- .../spark/sql/hive/client/VersionsSuite.scala | 13 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 171 ++++++++++++++++++++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 4 +- 14 files changed, 236 insertions(+), 64 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 43d9c2bec6..9ab4624594 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri.isEmpty val tableLocation = if (needDefaultTableLocation) { - Some(defaultTablePath(tableDefinition.identifier)) + Some(CatalogUtils.stringToURI(defaultTablePath(tableDefinition.identifier))) } else { tableDefinition.storage.locationUri } @@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. val storagePropsWithLocation = table.storage.properties ++ - table.storage.locationUri.map("path" -> _) + table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and // bucket specification to empty. Note that partition columns are retained, so that we can @@ -285,7 +285,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // compatible format, which means the data source is file-based and must have a `path`. require(table.storage.locationUri.isDefined, "External file-based data source table must have a `path` entry in storage properties.") - Some(new Path(table.location).toUri.toString) + Some(table.location) } else { None } @@ -432,13 +432,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. val tempPath = { - val dbLocation = getDatabase(tableDefinition.database).locationUri + val dbLocation = new Path(getDatabase(tableDefinition.database).locationUri) new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__") } try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) @@ -563,7 +563,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // want to alter the table location to a file path, we will fail. This should be fixed // in the future. - val newLocation = tableDefinition.storage.locationUri + val newLocation = tableDefinition.storage.locationUri.map(CatalogUtils.URIToString(_)) val storageWithPathOption = tableDefinition.storage.copy( properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) @@ -704,7 +704,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table) // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) + updateLocationInStorageProps(table, newPath = None).copy( + locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) } val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) @@ -848,10 +849,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, Hive metastore is not case preserving and will generate wrong partition location // with lower cased partition column names. Here we set the default partition location // manually to avoid this problem. - val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse { + val partitionPath = p.storage.locationUri.map(uri => new Path(uri)).getOrElse { ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) } - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))) } val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) @@ -890,7 +891,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newParts = newSpecs.map { spec => val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec) val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec)) - partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) + partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri))) } alterPartitions(db, table, newParts) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 151a69aebf..4d3b6c3cec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -128,7 +128,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions - val tablePath = new Path(new URI(relation.tableMeta.location)) + val tablePath = new Path(relation.tableMeta.location) val result = if (relation.isPartitioned) { val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { @@ -141,7 +141,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // locations,_omitting_ the table's base path. val paths = sparkSession.sharedState.externalCatalog .listPartitions(tableIdentifier.database, tableIdentifier.name) - .map(p => new Path(new URI(p.storage.locationUri.get))) + .map(p => new Path(p.storage.locationUri.get)) if (paths.isEmpty) { Seq(tablePath) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 624cfa206e..b5ce027d51 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -133,7 +133,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(new URI(table.location)) + val tablePath = new Path(table.location) val fs: FileSystem = tablePath.getFileSystem(hadoopConf) fs.getContentSummary(tablePath).getLength } catch { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7acaa9a7ab..469c9d84de 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -317,7 +317,7 @@ private[hive] class HiveClientImpl( new HiveDatabase( database.name, database.description, - database.locationUri, + CatalogUtils.URIToString(database.locationUri), Option(database.properties).map(_.asJava).orNull), ignoreIfExists) } @@ -335,7 +335,7 @@ private[hive] class HiveClientImpl( new HiveDatabase( database.name, database.description, - database.locationUri, + CatalogUtils.URIToString(database.locationUri), Option(database.properties).map(_.asJava).orNull)) } @@ -344,7 +344,7 @@ private[hive] class HiveClientImpl( CatalogDatabase( name = d.getName, description = d.getDescription, - locationUri = d.getLocationUri, + locationUri = CatalogUtils.stringToURI(d.getLocationUri), properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) } @@ -410,7 +410,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h), + locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI(_)), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -851,7 +851,8 @@ private[hive] object HiveClientImpl { conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)} + table.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach { loc => + hiveTable.getTTable.getSd.setLocation(loc)} table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -885,7 +886,7 @@ private[hive] object HiveClientImpl { } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(storageDesc.setLocation) + p.storage.locationUri.map(CatalogUtils.URIToString(_)).foreach(storageDesc.setLocation) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) @@ -906,7 +907,7 @@ private[hive] object HiveClientImpl { CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), + locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 7280748361..c6188fc683 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -24,10 +24,9 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JS import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import scala.util.Try import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver @@ -41,7 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException -import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegralType, StringType} @@ -268,7 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map( - uri => new Path(table.getPath, new Path(new URI(uri)))).orNull + uri => new Path(table.getPath, new Path(uri))).orNull val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { @@ -463,7 +462,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition( - s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull) + s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString(_)).orNull) if (s.parameters.nonEmpty) { addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 6d7a1c3937..490e02d0bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.net.URI + import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -70,7 +72,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) assert(desc.schema.isEmpty) // will be populated later when the table is actually created assert(desc.comment == Some("This is the staging page view table")) // TODO will be SQLText @@ -102,7 +104,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) assert(desc.schema.isEmpty) // will be populated later when the table is actually created // TODO will be SQLText assert(desc.comment == Some("This is the staging page view table")) @@ -338,7 +340,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" val (desc, _) = extractTableDesc(query) assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/path/to/nowhere")) + assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere"))) } test("create table - if not exists") { @@ -469,7 +471,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.viewText.isEmpty) assert(desc.viewDefaultDatabase.isEmpty) assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.storage.locationUri == Some("/path/to/mercury")) + assert(desc.storage.locationUri == Some(new URI("/path/to/mercury"))) assert(desc.storage.inputFormat == Some("winput")) assert(desc.storage.outputFormat == Some("wowput")) assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) @@ -644,7 +646,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle .add("id", "int") .add("name", "string", nullable = true, comment = "blabla")) assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) - assert(table.storage.locationUri == Some("/tmp/file")) + assert(table.storage.locationUri == Some(new URI("/tmp/file"))) assert(table.storage.properties == Map("my_prop" -> "1")) assert(table.comment == Some("BLABLA")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index ee632d24b7..705d43f1f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -40,7 +40,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client val tempDir = Utils.createTempDir().getCanonicalFile - val tempDirUri = tempDir.toURI.toString.stripSuffix("/") + val tempDirUri = tempDir.toURI + val tempDirStr = tempDir.getAbsolutePath override def beforeEach(): Unit = { sql("CREATE DATABASE test_db") @@ -59,9 +60,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest } private def defaultTableURI(tableName: String): URI = { - val defaultPath = - spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db"))) - new Path(defaultPath).toUri + spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db"))) } // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark @@ -170,8 +169,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl7", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"), - properties = Map("path" -> tempDirUri)), + locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")), + properties = Map("path" -> tempDirStr)), schema = new StructType(), provider = Some("json"), properties = Map( @@ -184,7 +183,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( locationUri = Some(tempDirUri), - properties = Map("path" -> tempDirUri)), + properties = Map("path" -> tempDirStr)), schema = simpleSchema, properties = Map( "spark.sql.sources.provider" -> "parquet", @@ -195,8 +194,8 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl9", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"), - properties = Map("path" -> tempDirUri)), + locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")), + properties = Map("path" -> tempDirStr)), schema = new StructType(), provider = Some("json"), properties = Map("spark.sql.sources.provider" -> "json")) @@ -220,7 +219,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest if (tbl.tableType == CatalogTableType.EXTERNAL) { // trim the URI prefix - val tableLocation = new URI(readBack.storage.locationUri.get).getPath + val tableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = tempDir.toURI.getPath.stripSuffix("/") assert(tableLocation == expectedLocation) } @@ -236,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest val readBack = getTableMetadata(tbl.identifier.table) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expected = dir.toURI.getPath.stripSuffix("/") assert(actualTableLocation == expected) } @@ -252,7 +251,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest assert(readBack.schema.sameType(expectedSchema)) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) { tempDir.toURI.getPath.stripSuffix("/") } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 16cf4d7ec6..892a22ddfa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.net.URI + import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType @@ -140,7 +142,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(hiveTable.storage.serde === Some(serde)) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) - assert(hiveTable.storage.locationUri === Some(path.toString)) + assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath))) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 8f0d5d886c..5f15a705a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -485,7 +485,7 @@ object SetWarehouseLocationTest extends Logging { val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) val expectedLocation = - "file:" + expectedWarehouseLocation.toString + "/testlocation" + CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( @@ -500,8 +500,8 @@ object SetWarehouseLocationTest extends Logging { sparkSession.sql("create table testLocation (a int)") val tableMetadata = catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) - val expectedLocation = - "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation" + val expectedLocation = CatalogUtils.stringToURI( + s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation") val actualLocation = tableMetadata.location if (actualLocation != expectedLocation) { throw new Exception( @@ -868,14 +868,16 @@ object SPARK_18360 { val rawTable = hiveClient.getTable("default", "test_tbl") // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table // location for tables in default database. - assert(rawTable.storage.locationUri.get.contains(newWarehousePath)) + assert(rawTable.storage.locationUri.map( + CatalogUtils.URIToString(_)).get.contains(newWarehousePath)) hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") // Spark SQL will use the location of default database to generate default table // location for tables in default database. - assert(readBack.storage.locationUri.get.contains(defaultDbLocation)) + assert(readBack.storage.locationUri.map(CatalogUtils.URIToString(_)) + .get.contains(defaultDbLocation)) } finally { hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 03ea0c8c77..f02b7218d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1011,7 +1011,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier("not_skip_hive_metadata"), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempPath.getCanonicalPath), + locationUri = Some(tempPath.toURI), properties = Map("skipHiveMetadata" -> "false") ), schema = schema, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 47ee4dd4d9..4aea6d14ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.hive +import java.net.URI + +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -26,8 +30,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private def checkTablePath(dbName: String, tableName: String): Unit = { val metastoreTable = spark.sharedState.externalCatalog.getTable(dbName, tableName) - val expectedPath = - spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName + val expectedPath = new Path(new Path( + spark.sharedState.externalCatalog.getDatabase(dbName).locationUri), tableName).toUri assert(metastoreTable.location === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d61d10bf86..dd624eca6b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -54,7 +55,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w test("success sanity check") { val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration()) - val db = new CatalogDatabase("default", "desc", "loc", Map()) + val db = new CatalogDatabase("default", "desc", new URI("loc"), Map()) badClient.createDatabase(db, ignoreIfExists = true) } @@ -125,10 +126,10 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w // Database related API /////////////////////////////////////////////////////////////////////////// - val tempDatabasePath = Utils.createTempDir().getCanonicalPath + val tempDatabasePath = Utils.createTempDir().toURI test(s"$version: createDatabase") { - val defaultDB = CatalogDatabase("default", "desc", "loc", Map()) + val defaultDB = CatalogDatabase("default", "desc", new URI("loc"), Map()) client.createDatabase(defaultDB, ignoreIfExists = true) val tempDB = CatalogDatabase( "temporary", description = "test create", tempDatabasePath, Map()) @@ -346,7 +347,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") - val newLocation = Utils.createTempDir().getPath() + val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) val storage = storageFormat.copy( locationUri = Some(newLocation), // needed for 0.12 alter partitions @@ -660,7 +661,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}" val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == expectedPath) + assert(table.location == CatalogUtils.stringToURI(expectedPath)) assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath)) checkAnswer(spark.table("t"), Row("1") :: Nil) @@ -669,7 +670,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}" - assert(table1.location.stripSuffix("/") == expectedPath1) + assert(table1.location == CatalogUtils.stringToURI(expectedPath1)) assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) checkAnswer(spark.table("t1"), Row(2) :: Nil) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 81ae5b7bdb..e956c9abae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.hive.execution import java.io.File +import java.lang.reflect.InvocationTargetException +import java.net.URI import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -25,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog @@ -710,7 +712,7 @@ class HiveDDLSuite } sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'") val db1 = catalog.getDatabaseMetadata(dbName) - val dbPath = tmpDir.toURI.toString.stripSuffix("/") + val dbPath = new URI(tmpDir.toURI.toString.stripSuffix("/")) assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty)) sql("USE db1") @@ -747,11 +749,12 @@ class HiveDDLSuite sql(s"CREATE DATABASE $dbName") val catalog = spark.sessionState.catalog val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db" + val expectedDBUri = CatalogUtils.stringToURI(expectedDBLocation) val db1 = catalog.getDatabaseMetadata(dbName) assert(db1 == CatalogDatabase( dbName, "", - expectedDBLocation, + expectedDBUri, Map.empty)) // the database directory was created assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) @@ -1606,7 +1609,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1626,7 +1629,7 @@ class HiveDDLSuite """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) val partDir = new File(dir, "a=3") assert(partDir.exists()) @@ -1686,4 +1689,162 @@ class HiveDDLSuite } } } + + Seq("a b", "a:b", "a%b").foreach { specialChars => + test(s"datasource table: location uri contains $specialChars") { + withTable("t", "t1") { + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t(a string) + |USING parquet + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == new Path(loc.getAbsolutePath).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + spark.sql("INSERT INTO TABLE t SELECT 1") + assert(loc.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1") :: Nil) + } + + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING parquet + |PARTITIONED BY(b) + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location == new Path(loc.getAbsolutePath).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1") + val partFile = new File(loc, "b=2") + assert(partFile.listFiles().length >= 1) + checkAnswer(spark.table("t1"), Row("1", "2") :: Nil) + + spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1") + val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14") + assert(!partFile1.exists()) + val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14") + assert(partFile2.listFiles().length >= 1) + checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil) + } + } + } + } + + Seq("a b", "a:b", "a%b").foreach { specialChars => + test(s"hive table: location uri contains $specialChars") { + withTable("t") { + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t(a string) + |USING hive + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val path = new Path(loc.getAbsolutePath) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) + assert(table.location == fs.makeQualified(path).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + if (specialChars != "a:b") { + spark.sql("INSERT INTO TABLE t SELECT 1") + assert(loc.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1") :: Nil) + } else { + val e = intercept[InvocationTargetException] { + spark.sql("INSERT INTO TABLE t SELECT 1") + }.getTargetException.getMessage + assert(e.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b")) + } + } + + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING hive + |PARTITIONED BY(b) + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val path = new Path(loc.getAbsolutePath) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) + assert(table.location == fs.makeQualified(path).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + if (specialChars != "a:b") { + spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1") + val partFile = new File(loc, "b=2") + assert(partFile.listFiles().length >= 1) + checkAnswer(spark.table("t1"), Row("1", "2") :: Nil) + + spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1") + val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14") + assert(!partFile1.exists()) + val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14") + assert(partFile2.listFiles().length >= 1) + checkAnswer(spark.table("t1"), + Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil) + } else { + val e = intercept[InvocationTargetException] { + spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1") + }.getTargetException.getMessage + assert(e.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b")) + + val e1 = intercept[InvocationTargetException] { + spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1") + }.getTargetException.getMessage + assert(e1.contains("java.net.URISyntaxException: Relative path in absolute URI: a:b")) + } + } + } + } + } + + Seq("a b", "a:b", "a%b").foreach { specialChars => + test(s"location uri contains $specialChars for database") { + try { + withTable("t") { + withTempDir { dir => + val loc = new File(dir, specialChars) + spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'") + spark.sql("USE tmpdb") + + Seq(1).toDF("a").write.saveAsTable("t") + val tblloc = new File(loc, "t") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val tblPath = new Path(tblloc.getAbsolutePath) + val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf()) + assert(table.location == fs.makeQualified(tblPath).toUri) + assert(tblloc.listFiles().nonEmpty) + } + } + } finally { + spark.sql("DROP DATABASE IF EXISTS tmpdb") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ef2d451e6b..be9a5fd71b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -544,7 +544,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } userSpecifiedLocation match { case Some(location) => - assert(r.tableMeta.location === location) + assert(r.tableMeta.location === CatalogUtils.stringToURI(location)) case None => // OK. } // Also make sure that the format and serde are as desired. -- cgit v1.2.3