diff options
author | Yin Huai <yhuai@databricks.com> | 2016-04-30 18:04:36 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-30 18:04:42 -0700 |
commit | 0182d9599d15f70eeb6288bf9294fa677004bd14 (patch) | |
tree | 322c2997d575cd5fa367eed0ef6b9cf2a69ba68b | |
parent | 19a6d192d53ce6dffe998ce110adab1f2efcb23e (diff) | |
download | spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.gz spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.bz2 spark-0182d9599d15f70eeb6288bf9294fa677004bd14.zip |
[SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
This PR contains three changes:
1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir.
2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string.
3. When we create a database, we need to make the path qualified.
Existing tests and new tests
Author: Yin Huai <yhuai@databricks.com>
Closes #12812 from yhuai/warehouse.
10 files changed, 236 insertions, 22 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b06f24bc48..a445a253ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -21,6 +21,7 @@ import java.io.File import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -44,14 +45,21 @@ class SessionCatalog( externalCatalog: ExternalCatalog, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: CatalystConf) extends Logging { + conf: CatalystConf, + hadoopConf: Configuration) extends Logging { import CatalogTypes.TablePartitionSpec + // For testing only. def this( externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry, conf: CatalystConf) { - this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf) + this( + externalCatalog, + DummyFunctionResourceLoader, + functionRegistry, + conf, + new Configuration()) } // For testing only. @@ -68,7 +76,8 @@ class SessionCatalog( // the corresponding item in the current database. protected var currentDb = { val defaultName = "default" - val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) + val defaultDbDefinition = + CatalogDatabase(defaultName, "default database", conf.warehousePath, Map()) // Initialize default database if it doesn't already exist createDatabase(defaultDbDefinition, ignoreIfExists = true) defaultName @@ -81,6 +90,18 @@ class SessionCatalog( if (conf.caseSensitiveAnalysis) name else name.toLowerCase } + /** + * This method is used to make the given path qualified before we + * store this path in the underlying external catalog. So, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + private def makeQualifiedPath(path: String): Path = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + fs.makeQualified(hadoopPath) + } + // ---------------------------------------------------------------------------- // Databases // ---------------------------------------------------------------------------- @@ -88,7 +109,10 @@ class SessionCatalog( // ---------------------------------------------------------------------------- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { - externalCatalog.createDatabase(dbDefinition, ignoreIfExists) + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString + externalCatalog.createDatabase( + dbDefinition.copy(locationUri = qualifiedPath), + ignoreIfExists) } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { @@ -125,7 +149,8 @@ class SessionCatalog( } def getDefaultDBPath(db: String): String = { - new Path(new Path(conf.warehousePath), db + ".db").toString + val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase + new Path(new Path(conf.warehousePath), database + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7d3ff9e947..4c2a7b8ae9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalog.Catalog @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager @@ -53,7 +54,7 @@ import org.apache.spark.util.Utils class SparkSession private( @transient val sparkContext: SparkContext, @transient private val existingSharedState: Option[SharedState]) - extends Serializable { self => + extends Serializable with Logging { self => def this(sc: SparkContext) { this(sc, None) @@ -64,6 +65,19 @@ class SparkSession private( | Session-related state | * ----------------------- */ + { + val defaultWarehousePath = + SQLConf.WAREHOUSE_PATH + .defaultValueString + .replace("${system:user.dir}", System.getProperty("user.dir")) + val warehousePath = sparkContext.conf.get( + SQLConf.WAREHOUSE_PATH.key, + defaultWarehousePath) + sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, warehousePath) + sparkContext.conf.set("hive.metastore.warehouse.dir", warehousePath) + logInfo(s"Setting warehouse location to $warehousePath") + } + /** * State shared across sessions, including the [[SparkContext]], cached data, listener, * and a catalog that interacts with external systems. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 6fa044aee0..ebff756979 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -99,7 +99,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { sparkSession.externalCatalog, functionResourceLoader, functionRegistry, - conf) + conf, + newHadoopConf()) /** * Interface exposed to the user for registering user-defined functions. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4162329d76..12acb9f276 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command import java.io.File +import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row} @@ -64,15 +65,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } private def createDatabase(catalog: SessionCatalog, name: String): Unit = { - catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false) + catalog.createDatabase( + CatalogDatabase(name, "", sqlContext.conf.warehousePath, Map()), ignoreIfExists = false) } private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { + val storage = + CatalogStorageFormat( + locationUri = Some(catalog.defaultTablePath(name)), + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map()) catalog.createTable(CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat(None, None, None, None, Map()), - schema = Seq()), ignoreIfExists = false) + storage = storage, + schema = Seq(), + createTime = 0L), ignoreIfExists = false) } private def createTablePartition( @@ -83,6 +93,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } + test("the qualified path of a database is stored in the catalog") { + val catalog = sqlContext.sessionState.catalog + + val path = System.getProperty("java.io.tmpdir") + // The generated temp path is not qualified. + assert(!path.startsWith("file:/")) + sql(s"CREATE DATABASE db1 LOCATION '$path'") + val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri + assert("file" === pathInCatalog.getScheme) + assert(path === pathInCatalog.getPath) + + withSQLConf( + SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) { + sql(s"CREATE DATABASE db2") + val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri + assert("file" === pathInCatalog.getScheme) + assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath) + } + + sql("DROP DATABASE db1") + sql("DROP DATABASE db2") + } + test("Create/Drop Database") { withSQLConf( SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { @@ -96,10 +129,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + val expectedLocation = + "file:" + System.getProperty("java.io.tmpdir") + + File.separator + s"$dbNameWithoutBackTicks.db" assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + expectedLocation, Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) @@ -121,10 +157,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + val expectedLocation = + "file:" + System.getProperty("java.io.tmpdir") + + File.separator + s"$dbNameWithoutBackTicks.db" assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + expectedLocation, Map.empty)) val message = intercept[AnalysisException] { @@ -148,7 +187,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { try { val dbNameWithoutBackTicks = cleanIdentifier(dbName) val location = - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db" + "file:" + System.getProperty("java.io.tmpdir") + + File.separator + s"$dbNameWithoutBackTicks.db" + sql(s"CREATE DATABASE $dbName") checkAnswer( @@ -210,6 +251,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // TODO: test drop database in restrict mode + test("create table in default db") { + val catalog = sqlContext.sessionState.catalog + val tableIdent1 = TableIdentifier("tab1", None) + createTable(catalog, tableIdent1) + val expectedTableIdent = tableIdent1.copy(database = Some("default")) + val expectedLocation = + catalog.getDatabaseMetadata("default").locationUri + "/tab1" + val expectedStorage = + CatalogStorageFormat( + locationUri = Some(expectedLocation), + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map()) + val expectedTable = + CatalogTable( + identifier = expectedTableIdent, + tableType = CatalogTableType.EXTERNAL, + storage = expectedStorage, + schema = Seq(), + createTime = 0L) + assert(catalog.getTableMetadata(tableIdent1) === expectedTable) + } + + test("create table in a specific db") { + val catalog = sqlContext.sessionState.catalog + createDatabase(catalog, "dbx") + val tableIdent1 = TableIdentifier("tab1", Some("dbx")) + createTable(catalog, tableIdent1) + val expectedLocation = + catalog.getDatabaseMetadata("dbx").locationUri + "/tab1" + val expectedStorage = + CatalogStorageFormat( + locationUri = Some(expectedLocation), + inputFormat = None, + outputFormat = None, + serde = None, + serdeProperties = Map()) + val expectedTable = + CatalogTable( + identifier = tableIdent1, + tableType = CatalogTableType.EXTERNAL, + storage = expectedStorage, + schema = Seq(), + createTime = 0L) + assert(catalog.getTableMetadata(tableIdent1) === expectedTable) + } + test("alter table: rename") { val catalog = sqlContext.sessionState.catalog val tableIdent1 = TableIdentifier("tab1", Some("dbx")) @@ -534,7 +623,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined) assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 456587e0e0..f023edbd96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} @@ -44,8 +45,14 @@ private[sql] class HiveSessionCatalog( sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: SQLConf) - extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) { + conf: SQLConf, + hadoopConf: Configuration) + extends SessionCatalog( + externalCatalog, + functionResourceLoader, + functionRegistry, + conf, + hadoopConf) { override def setCurrentDatabase(db: String): Unit = { super.setCurrentDatabase(db) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 57aa4b2931..31f28f205f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -54,7 +54,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession, functionResourceLoader, functionRegistry, - conf) + conf, + newHadoopConf()) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index be89edbad7..d033b05d48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -379,7 +379,7 @@ private[spark] object HiveUtils extends Logging { propMap.put(confvar.varname, confvar.getDefaultExpr()) } } - propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString) + propMap.put(SQLConf.WAREHOUSE_PATH.key, localMetastore.toURI.toString) propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true") propMap.put("datanucleus.rdbms.datastoreAdapterClassName", diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 78ba2bfda6..cdfadfaaea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -179,7 +179,7 @@ private[hive] class HiveClientImpl( // Log the default warehouse location. logInfo( - s"Default warehouse location for Hive client " + + s"Warehouse location for Hive client " + s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") /** Returns the configuration for the current session. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b41d882ffa..42746ece3c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -115,6 +115,12 @@ private[hive] class TestHiveSparkSession( @transient private val existingSharedState: Option[TestHiveSharedState]) extends SparkSession(sc) with Logging { self => + // TODO: We need to set the temp warehouse path to sc's conf. + // Right now, In SparkSession, we will set the warehouse path to the default one + // instead of the temp one. Then, we override the setting in TestHiveSharedState + // when we creating metadataHive. This flow is not easy to follow and can introduce + // confusion when a developer is debugging an issue. We need to refactor this part + // to just set the temp warehouse path in sc's conf. def this(sc: SparkContext) { this( sc, @@ -573,6 +579,8 @@ private[hive] object TestHiveContext { scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): Map[String, String] = { HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map( + // Override WAREHOUSE_PATH and METASTOREWAREHOUSE to use the given path. + SQLConf.WAREHOUSE_PATH.key -> warehousePath.toURI.toString, ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index cc05e1d1d7..77a6a94a67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -31,9 +31,9 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.sql.{QueryTest, Row, SQLContext} +import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogFunction -import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer @@ -202,6 +202,19 @@ class HiveSparkSubmitSuite runSparkSubmit(args) } + test("set spark.sql.warehouse.dir") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"), + "--name", "SetWarehouseLocationTest", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--driver-java-options", "-Dderby.system.durability=test", + unusedJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -262,6 +275,62 @@ class HiveSparkSubmitSuite } } +object SetWarehouseLocationTest extends Logging { + def main(args: Array[String]): Unit = { + Utils.configTestLog4j("INFO") + val warehouseLocation = Utils.createTempDir() + warehouseLocation.delete() + val hiveWarehouseLocation = Utils.createTempDir() + hiveWarehouseLocation.delete() + + val conf = new SparkConf() + conf.set("spark.ui.enabled", "false") + // We will use the value of spark.sql.warehouse.dir override the + // value of hive.metastore.warehouse.dir. + conf.set("spark.sql.warehouse.dir", warehouseLocation.toString) + conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) + + val sc = new SparkContext(conf) + val sparkSession = SparkSession.withHiveSupport(sc) + val catalog = sparkSession.sessionState.catalog + + sparkSession.sql("drop table if exists testLocation") + sparkSession.sql("drop database if exists testLocationDB cascade") + + { + sparkSession.sql("create table testLocation (a int)") + val tableMetadata = + catalog.getTableMetadata(TableIdentifier("testLocation", Some("default"))) + val expectedLocation = + "file:" + warehouseLocation.toString + "/testlocation" + val actualLocation = tableMetadata.storage.locationUri.get + if (actualLocation != expectedLocation) { + throw new Exception( + s"Expected table location is $expectedLocation. But, it is actually $actualLocation") + } + sparkSession.sql("drop table testLocation") + } + + { + sparkSession.sql("create database testLocationDB") + sparkSession.sql("use testLocationDB") + sparkSession.sql("create table testLocation (a int)") + val tableMetadata = + catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB"))) + val expectedLocation = + "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation" + val actualLocation = tableMetadata.storage.locationUri.get + if (actualLocation != expectedLocation) { + throw new Exception( + s"Expected table location is $expectedLocation. But, it is actually $actualLocation") + } + sparkSession.sql("drop table testLocation") + sparkSession.sql("use default") + sparkSession.sql("drop database testLocationDB") + } + } +} + // This application is used to test defining a new Hive UDF (with an associated jar) // and use this UDF. We need to run this test in separate JVM to make sure we // can load the jar defined with the function. |