aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala50
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala4
8 files changed, 64 insertions, 31 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6cfc4a4321..bfcdb70fe4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -259,7 +259,19 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
- val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+
+ val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
+ && !tableDefinition.storage.locationUri.get.isAbsolute) {
+ // make the location of the table qualified.
+ val qualifiedTableLocation =
+ makeQualifiedPath(tableDefinition.storage.locationUri.get)
+ tableDefinition.copy(
+ storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
+ identifier = TableIdentifier(table, Some(db)))
+ } else {
+ tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
+ }
+
requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index aa335c4453..5f70a8ce89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -230,8 +230,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
private def getDBPath(dbName: String): URI = {
- val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
- new Path(warehousePath, s"$dbName.db").toUri
+ val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
+ new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}
test("the qualified path of a database is stored in the catalog") {
@@ -1360,7 +1360,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val partitionLocation = if (isUsingHiveMetastore) {
val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
assert(tableLocation.isDefined)
- makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+ makeQualifiedPath(new Path(tableLocation.get.toString, "paris").toString)
} else {
new URI("paris")
}
@@ -1909,7 +1909,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
dir.delete
assert(!dir.exists)
@@ -1950,7 +1950,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|LOCATION "$dir"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1976,7 +1976,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|OPTIONS(path "$dir")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
dir.delete()
checkAnswer(spark.table("t"), Nil)
@@ -2032,7 +2033,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -2051,7 +2052,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
@@ -2099,7 +2100,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -2120,7 +2121,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new Path(loc.getAbsolutePath).toUri)
+ assert(table.location == makeQualifiedPath(loc.getAbsolutePath))
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
@@ -2162,4 +2163,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("the qualified path of a datasource table is stored in the catalog") {
+ withTable("t", "t1") {
+ withTempDir { dir =>
+ assert(!dir.getAbsolutePath.startsWith("file:/"))
+ spark.sql(
+ s"""
+ |CREATE TABLE t(a string)
+ |USING parquet
+ |LOCATION '$dir'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.location.toString.startsWith("file:/"))
+ }
+
+ withTempDir { dir =>
+ assert(!dir.getAbsolutePath.startsWith("file:/"))
+ spark.sql(
+ s"""
+ |CREATE TABLE t1(a string, b string)
+ |USING parquet
+ |PARTITIONED BY(b)
+ |LOCATION '$dir'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+ assert(table.location.toString.startsWith("file:/"))
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index fcb8ffbc6e..9742b3b2d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.internal
import java.io.File
-import java.net.URI
import org.scalatest.BeforeAndAfterEach
@@ -459,7 +458,7 @@ class CatalogSuite
options = Map("path" -> dir.getAbsolutePath))
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.tableType == CatalogTableType.EXTERNAL)
- assert(table.storage.locationUri.get == new URI(dir.getAbsolutePath))
+ assert(table.storage.locationUri.get == makeQualifiedPath(dir.getAbsolutePath))
Seq((1)).toDF("i").write.insertInto("t")
assert(dir.exists() && dir.listFiles().nonEmpty)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 7ab339e005..60adee4599 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,7 +75,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
|USING ${classOf[TestOptionsSource].getCanonicalName}
|OPTIONS (PATH '/tmp/path')
""".stripMargin)
- assert(getPathOption("src") == Some("/tmp/path"))
+ assert(getPathOption("src") == Some("file:/tmp/path"))
}
// should exist even path option is not specified when creating table
@@ -88,15 +88,16 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
test("path option also exist for write path") {
withTable("src") {
withTempPath { p =>
- val path = new Path(p.getAbsolutePath).toString
sql(
s"""
|CREATE TABLE src
|USING ${classOf[TestOptionsSource].getCanonicalName}
- |OPTIONS (PATH '$path')
+ |OPTIONS (PATH '$p')
|AS SELECT 1
""".stripMargin)
- assert(spark.table("src").schema.head.metadata.getString("path") == path)
+ assert(CatalogUtils.stringToURI(
+ spark.table("src").schema.head.metadata.getString("path")) ==
+ makeQualifiedPath(p.getAbsolutePath))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 12fc8993d7..9201954b66 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,11 +306,6 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
-
- def makeQualifiedPath(path: Path): URI = {
- val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
- fs.makeQualified(path).toUri
- }
}
private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index cf552b4a88..079358b29a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
-import java.net.URI
-
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
@@ -142,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(hiveTable.storage.serde === Some(serde))
assert(hiveTable.tableType === CatalogTableType.EXTERNAL)
- assert(hiveTable.storage.locationUri === Some(new URI(path.getAbsolutePath)))
+ assert(hiveTable.storage.locationUri === Some(makeQualifiedPath(dir.getAbsolutePath)))
val columns = hiveTable.schema
assert(columns.map(_.name) === Seq("d1", "d2"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dd624eca6b..6025f8adbc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -658,19 +658,17 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
Seq("1").toDF("a").write.saveAsTable("t")
- val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}"
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == CatalogUtils.stringToURI(expectedPath))
+ assert(table.location == makeQualifiedPath(tPath.toString))
assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
checkAnswer(spark.table("t"), Row("1") :: Nil)
val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
spark.sql("create table t1 using parquet as select 2 as a")
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}"
- assert(table1.location == CatalogUtils.stringToURI(expectedPath1))
+ assert(table1.location == makeQualifiedPath(t1Path.toString))
assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
checkAnswer(spark.table("t1"), Row(2) :: Nil)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index fce055048d..23aea24697 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1681,7 +1681,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
@@ -1701,7 +1701,7 @@ class HiveDDLSuite
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
- assert(table.location == new URI(dir.getAbsolutePath))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())