diff options
author | windpiger <songjun@outlook.com> | 2017-03-06 10:44:26 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-06 10:44:26 -0800 |
commit | 096df6d933c5326e5782aa8c5de842a0800eb369 (patch) | |
tree | a126f9307afbcac51f61b1b6038d541efac2a49c /sql/core/src/test/scala/org/apache | |
parent | 46a64d1e0ae12c31e848f377a84fb28e3efb3699 (diff) | |
download | spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.gz spark-096df6d933c5326e5782aa8c5de842a0800eb369.tar.bz2 spark-096df6d933c5326e5782aa8c5de842a0800eb369.zip |
[SPARK-19257][SQL] location for table/partition/database should be java.net.URI
## What changes were proposed in this pull request?
Currently we treat the location of table/partition/database as URI string.
It will be safer if we can make the type of location as java.net.URI.
In this PR, there are following classes changes:
**1. CatalogDatabase**
```
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
properties: Map[String, String])
--->
case class CatalogDatabase(
name: String,
description: String,
locationUri: URI,
properties: Map[String, String])
```
**2. CatalogStorageFormat**
```
case class CatalogStorageFormat(
locationUri: Option[String],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
---->
case class CatalogStorageFormat(
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String])
```
Before and After this PR, it is transparent for user, there is no change that the user should concern. The `String` to `URI` just happened in SparkSQL internally.
Here list some operation related location:
**1. whitespace in the location**
e.g. `/a/b c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b c/d`,
and the real path in the FileSystem also show `/a/b c/d`
**2. colon(:) in the location**
e.g. `/a/b:c/d`
For both table location and partition location,
when `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b:c/d'` ,
**In linux file system**
`DESC EXTENDED t ` show the location is `/a/b:c/d`,
and the real path in the FileSystem also show `/a/b:c/d`
**in HDFS** throw exception:
`java.lang.IllegalArgumentException: Pathname /a/b:c/d from hdfs://iZbp1151s8hbnnwriekxdeZ:9000/a/b:c/d is not a valid DFS filename.`
**while** After `INSERT INTO TABLE t PARTITION(a="a:b") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=a%3Ab`,
and the real path in the FileSystem also show `/xxx/a=a%3Ab`
**3. percent sign(%) in the location**
e.g. `/a/b%c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%c/d`,
and the real path in the FileSystem also show `/a/b%c/d`
**4. encoded(%25) in the location**
e.g. `/a/b%25c/d`
For both table location and partition location,
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION '/a/b%25c/d'` ,
then `DESC EXTENDED t ` show the location is `/a/b%25c/d`,
and the real path in the FileSystem also show `/a/b%25c/d`
**while** After `INSERT INTO TABLE t PARTITION(a="%25") SELECT 1`
then `DESC EXTENDED t ` show the location is `/xxx/a=%2525`,
and the real path in the FileSystem also show `/xxx/a=%2525`
**Additionally**, except the location, there are two other factors will affect the location of the table/partition. one is the table name which does not allowed to have special characters, and the other is `partition name` which have the same actions with `partition value`, and `partition name` with special character situation has add some testcase and resolve a bug in [PR](https://github.com/apache/spark/pull/17173)
### Summary:
After `CREATE TABLE t... (PARTITIONED BY ...) LOCATION path`,
the path which we get from `DESC TABLE` and `real path in FileSystem` are all the same with the `CREATE TABLE` command(different filesystem has different action that allow what kind of special character to create the path, e.g. HDFS does not allow colon, but linux filesystem allow it ).
`DataBase` also have the same logic with `CREATE TABLE`
while if the `partition value` has some special character like `%` `:` `#` etc, then we will get the path with encoded `partition value` like `/xxx/a=A%25B` from `DESC TABLE` and `real path in FileSystem`
In this PR, the core change code is using `new Path(str).toUri` and `new Path(uri).toString`
which transfrom `str to uri `or `uri to str`.
for example:
```
val str = '/a/b c/d'
val uri = new Path(str).toUri --> '/a/b%20c/d'
val strFromUri = new Path(uri).toString -> '/a/b c/d'
```
when we restore table/partition from metastore, or get the location from `CREATE TABLE` command, we can use it as above to change string to uri `new Path(str).toUri `
## How was this patch tested?
unit test added.
The `current master branch` also `passed all the test cases` added in this PR by a litter change.
https://github.com/apache/spark/pull/17149/files#diff-b7094baa12601424a5d19cb930e3402fR1764
here `toURI` -> `toString` when test in master branch.
This can show that this PR is transparent for user.
Author: windpiger <songjun@outlook.com>
Closes #17149 from windpiger/changeStringToURI.
Diffstat (limited to 'sql/core/src/test/scala/org/apache')
5 files changed, 123 insertions, 39 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 76bb9e5929..4b73b078da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.net.URI + import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier @@ -317,7 +319,7 @@ class DDLCommandSuite extends PlanTest { val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'" val ct = parseAs[CreateTable](query) assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) - assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) + assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } test("create hive table - property values must be set") { @@ -334,7 +336,7 @@ class DDLCommandSuite extends PlanTest { val query = "CREATE TABLE my_tab LOCATION '/something/anything'" val ct = parseAs[CreateTable](query) assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) - assert(ct.tableDesc.storage.locationUri == Some("/something/anything")) + assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } test("create table - with partitioned by") { @@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest { val expectedTableDesc = CatalogTable( identifier = TableIdentifier("my_tab"), tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")), + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), schema = new StructType().add("a", IntegerType).add("b", StringType), provider = Some("parquet")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 8b8cd0fdf4..6ffa58bcd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,9 +26,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -72,7 +70,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def createDatabase(catalog: SessionCatalog, name: String): Unit = { catalog.createDatabase( - CatalogDatabase(name, "", spark.sessionState.conf.warehousePath, Map()), + CatalogDatabase( + name, "", CatalogUtils.stringToURI(spark.sessionState.conf.warehousePath), Map()), ignoreIfExists = false) } @@ -133,11 +132,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - private def makeQualifiedPath(path: String): String = { + private def makeQualifiedPath(path: String): URI = { // copy-paste from SessionCatalog val hadoopPath = new Path(path) val fs = hadoopPath.getFileSystem(sparkContext.hadoopConfiguration) - fs.makeQualified(hadoopPath).toString + fs.makeQualified(hadoopPath).toUri } test("Create Database using Default Warehouse Path") { @@ -449,7 +448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", location) :: + Row("Location", CatalogUtils.URIToString(location)) :: Row("Properties", "") :: Nil) sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") @@ -458,7 +457,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", location) :: + Row("Location", CatalogUtils.URIToString(location)) :: Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") @@ -467,7 +466,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"DESCRIBE DATABASE EXTENDED $dbName"), Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: - Row("Location", location) :: + Row("Location", CatalogUtils.URIToString(location)) :: Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) } finally { catalog.reset() @@ -1094,7 +1093,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined) assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty) // Verify that the location is set to the expected string - def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = { + def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = { val storageFormat = spec .map { s => catalog.getPartition(tableIdent, s).storage } .getOrElse { catalog.getTableMetadata(tableIdent).storage } @@ -1111,17 +1110,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } // set table location sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'") - verifyLocation("/path/to/your/lovely/heart") + verifyLocation(new URI("/path/to/your/lovely/heart")) // set table partition location sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'") - verifyLocation("/path/to/part/ways", Some(partSpec)) + verifyLocation(new URI("/path/to/part/ways"), Some(partSpec)) // set table location without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'") - verifyLocation("/swanky/steak/place") + verifyLocation(new URI("/swanky/steak/place")) // set table partition location without explicitly specifying database sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'") - verifyLocation("vienna", Some(partSpec)) + verifyLocation(new URI("vienna"), Some(partSpec)) // table to alter does not exist intercept[AnalysisException] { sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'") @@ -1255,7 +1254,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined) - assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris")) + assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris"))) assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined) // add partitions without explicitly specifying database @@ -1819,7 +1818,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) // the previous table path should be still there. - assert(new File(new URI(defaultTablePath)).exists()) + assert(new File(defaultTablePath).exists()) sql("INSERT INTO tbl SELECT 2") checkAnswer(spark.table("tbl"), Row(2)) @@ -1843,28 +1842,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) dir.delete - val tableLocFile = new File(table.location) - assert(!tableLocFile.exists) + assert(!dir.exists) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") - assert(tableLocFile.exists) + assert(dir.exists) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) Utils.deleteRecursively(dir) - assert(!tableLocFile.exists) + assert(!dir.exists) spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1") - assert(tableLocFile.exists) + assert(dir.exists) checkAnswer(spark.table("t"), Row("c", 1) :: Nil) val newDirFile = new File(dir, "x") - val newDir = newDirFile.toURI.toString + val newDir = newDirFile.getAbsolutePath spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") spark.sessionState.catalog.refreshTable(TableIdentifier("t")) val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table1.location == newDir) + assert(table1.location == new URI(newDir)) assert(!newDirFile.exists) spark.sql("INSERT INTO TABLE t SELECT 'c', 1") @@ -1885,7 +1883,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |LOCATION "$dir" """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4") checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil) @@ -1911,13 +1909,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |OPTIONS(path "$dir") """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) dir.delete() checkAnswer(spark.table("t"), Nil) val newDirFile = new File(dir, "x") - val newDir = newDirFile.toURI.toString + val newDir = newDirFile.toURI spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) @@ -1967,7 +1965,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) checkAnswer(spark.table("t"), Row(3, 4, 1, 2)) } @@ -1986,7 +1984,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - assert(table.location == dir.getAbsolutePath) + assert(table.location == new URI(dir.getAbsolutePath)) val partDir = new File(dir, "a=3") assert(partDir.exists()) @@ -1996,4 +1994,84 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + Seq("a b", "a:b", "a%b").foreach { specialChars => + test(s"location uri contains $specialChars for datasource table") { + withTable("t", "t1") { + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t(a string) + |USING parquet + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location == new Path(loc.getAbsolutePath).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + spark.sql("INSERT INTO TABLE t SELECT 1") + assert(loc.listFiles().length >= 1) + checkAnswer(spark.table("t"), Row("1") :: Nil) + } + + withTempDir { dir => + val loc = new File(dir, specialChars) + loc.mkdir() + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING parquet + |PARTITIONED BY(b) + |LOCATION '$loc' + """.stripMargin) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + assert(table.location == new Path(loc.getAbsolutePath).toUri) + assert(new Path(table.location).toString.contains(specialChars)) + + assert(loc.listFiles().isEmpty) + spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1") + val partFile = new File(loc, "b=2") + assert(partFile.listFiles().length >= 1) + checkAnswer(spark.table("t1"), Row("1", "2") :: Nil) + + spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1") + val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14") + assert(!partFile1.exists()) + val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14") + assert(partFile2.listFiles().length >= 1) + checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil) + } + } + } + } + + Seq("a b", "a:b", "a%b").foreach { specialChars => + test(s"location uri contains $specialChars for database") { + try { + withTable("t") { + withTempDir { dir => + val loc = new File(dir, specialChars) + spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'") + spark.sql("USE tmpdb") + + import testImplicits._ + Seq(1).toDF("a").write.saveAsTable("t") + val tblloc = new File(loc, "t") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + val tblPath = new Path(tblloc.getAbsolutePath) + val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf()) + assert(table.location == fs.makeQualified(tblPath).toUri) + assert(tblloc.listFiles().nonEmpty) + } + } + } finally { + spark.sql("DROP DATABASE IF EXISTS tmpdb") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 75723d0abc..989a7f2698 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -459,7 +459,7 @@ class CatalogSuite options = Map("path" -> dir.getAbsolutePath)) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.tableType == CatalogTableType.EXTERNAL) - assert(table.storage.locationUri.get == dir.getAbsolutePath) + assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath)) Seq((1)).toDF("i").write.insertInto("t") assert(dir.exists() && dir.listFiles().nonEmpty) @@ -481,7 +481,7 @@ class CatalogSuite options = Map.empty[String, String]) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.tableType == CatalogTableType.MANAGED) - val tablePath = new File(new URI(table.storage.locationUri.get)) + val tablePath = new File(table.storage.locationUri.get) assert(tablePath.exists() && tablePath.listFiles().isEmpty) Seq((1)).toDF("i").write.insertInto("t") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 9082261af7..93f3efe2cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,7 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) + new File(spark.sessionState.catalog.defaultTablePath(identifier)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index faf9afc49a..7ab339e005 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.sources +import java.net.URI + import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StructType} @@ -78,7 +81,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { // should exist even path option is not specified when creating table withTable("src") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") - assert(getPathOption("src") == Some(defaultTablePath("src"))) + assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src")))) } } @@ -105,7 +108,8 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |AS SELECT 1 """.stripMargin) - assert(spark.table("src").schema.head.metadata.getString("path") == defaultTablePath("src")) + assert(spark.table("src").schema.head.metadata.getString("path") == + CatalogUtils.URIToString(defaultTablePath("src"))) } } @@ -123,7 +127,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { withTable("src", "src2") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") sql("ALTER TABLE src RENAME TO src2") - assert(getPathOption("src2") == Some(defaultTablePath("src2"))) + assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2")))) } } @@ -133,7 +137,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { }.head } - private def defaultTablePath(tableName: String): String = { + private def defaultTablePath(tableName: String): URI = { spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName)) } } |