aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-30 18:04:36 -0700
committerReynold Xin <rxin@databricks.com>2016-04-30 18:04:42 -0700
commit0182d9599d15f70eeb6288bf9294fa677004bd14 (patch)
tree322c2997d575cd5fa367eed0ef6b9cf2a69ba68b /sql
parent19a6d192d53ce6dffe998ce110adab1f2efcb23e (diff)
downloadspark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.gz
spark-0182d9599d15f70eeb6288bf9294fa677004bd14.tar.bz2
spark-0182d9599d15f70eeb6288bf9294fa677004bd14.zip
[SPARK-15034][SPARK-15035][SPARK-15036][SQL] Use spark.sql.warehouse.dir as the warehouse location
This PR contains three changes: 1. We will use spark.sql.warehouse.dir set warehouse location. We will not use hive.metastore.warehouse.dir. 2. SessionCatalog needs to set the location to default db. Otherwise, when creating a table in SparkSession without hive support, the default db's path will be an empty string. 3. When we create a database, we need to make the path qualified. Existing tests and new tests Author: Yin Huai <yhuai@databricks.com> Closes #12812 from yhuai/warehouse.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala103
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala73
10 files changed, 236 insertions, 22 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b06f24bc48..a445a253ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,6 +21,7 @@ import java.io.File
import scala.collection.mutable
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
@@ -44,14 +45,21 @@ class SessionCatalog(
externalCatalog: ExternalCatalog,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: CatalystConf) extends Logging {
+ conf: CatalystConf,
+ hadoopConf: Configuration) extends Logging {
import CatalogTypes.TablePartitionSpec
+ // For testing only.
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
- this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
+ this(
+ externalCatalog,
+ DummyFunctionResourceLoader,
+ functionRegistry,
+ conf,
+ new Configuration())
}
// For testing only.
@@ -68,7 +76,8 @@ class SessionCatalog(
// the corresponding item in the current database.
protected var currentDb = {
val defaultName = "default"
- val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
+ val defaultDbDefinition =
+ CatalogDatabase(defaultName, "default database", conf.warehousePath, Map())
// Initialize default database if it doesn't already exist
createDatabase(defaultDbDefinition, ignoreIfExists = true)
defaultName
@@ -81,6 +90,18 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}
+ /**
+ * This method is used to make the given path qualified before we
+ * store this path in the underlying external catalog. So, when a path
+ * does not contain a scheme, this path will not be changed after the default
+ * FileSystem is changed.
+ */
+ private def makeQualifiedPath(path: String): Path = {
+ val hadoopPath = new Path(path)
+ val fs = hadoopPath.getFileSystem(hadoopConf)
+ fs.makeQualified(hadoopPath)
+ }
+
// ----------------------------------------------------------------------------
// Databases
// ----------------------------------------------------------------------------
@@ -88,7 +109,10 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+ val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
+ externalCatalog.createDatabase(
+ dbDefinition.copy(locationUri = qualifiedPath),
+ ignoreIfExists)
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
@@ -125,7 +149,8 @@ class SessionCatalog(
}
def getDefaultDBPath(db: String): String = {
- new Path(new Path(conf.warehousePath), db + ".db").toString
+ val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
+ new Path(new Path(conf.warehousePath), database + ".db").toString
}
// ----------------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7d3ff9e947..4c2a7b8ae9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.Catalog
@@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
+import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -53,7 +54,7 @@ import org.apache.spark.util.Utils
class SparkSession private(
@transient val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState])
- extends Serializable { self =>
+ extends Serializable with Logging { self =>
def this(sc: SparkContext) {
this(sc, None)
@@ -64,6 +65,19 @@ class SparkSession private(
| Session-related state |
* ----------------------- */
+ {
+ val defaultWarehousePath =
+ SQLConf.WAREHOUSE_PATH
+ .defaultValueString
+ .replace("${system:user.dir}", System.getProperty("user.dir"))
+ val warehousePath = sparkContext.conf.get(
+ SQLConf.WAREHOUSE_PATH.key,
+ defaultWarehousePath)
+ sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, warehousePath)
+ sparkContext.conf.set("hive.metastore.warehouse.dir", warehousePath)
+ logInfo(s"Setting warehouse location to $warehousePath")
+ }
+
/**
* State shared across sessions, including the [[SparkContext]], cached data, listener,
* and a catalog that interacts with external systems.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 6fa044aee0..ebff756979 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -99,7 +99,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
sparkSession.externalCatalog,
functionResourceLoader,
functionRegistry,
- conf)
+ conf,
+ newHadoopConf())
/**
* Interface exposed to the user for registering user-defined functions.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4162329d76..12acb9f276 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
import java.io.File
+import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
@@ -64,15 +65,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
- catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
+ catalog.createDatabase(
+ CatalogDatabase(name, "", sqlContext.conf.warehousePath, Map()), ignoreIfExists = false)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
+ val storage =
+ CatalogStorageFormat(
+ locationUri = Some(catalog.defaultTablePath(name)),
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map())
catalog.createTable(CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat(None, None, None, None, Map()),
- schema = Seq()), ignoreIfExists = false)
+ storage = storage,
+ schema = Seq(),
+ createTime = 0L), ignoreIfExists = false)
}
private def createTablePartition(
@@ -83,6 +93,29 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
+ test("the qualified path of a database is stored in the catalog") {
+ val catalog = sqlContext.sessionState.catalog
+
+ val path = System.getProperty("java.io.tmpdir")
+ // The generated temp path is not qualified.
+ assert(!path.startsWith("file:/"))
+ sql(s"CREATE DATABASE db1 LOCATION '$path'")
+ val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri
+ assert("file" === pathInCatalog.getScheme)
+ assert(path === pathInCatalog.getPath)
+
+ withSQLConf(
+ SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) {
+ sql(s"CREATE DATABASE db2")
+ val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri
+ assert("file" === pathInCatalog.getScheme)
+ assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath)
+ }
+
+ sql("DROP DATABASE db1")
+ sql("DROP DATABASE db2")
+ }
+
test("Create/Drop Database") {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
@@ -96,10 +129,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ val expectedLocation =
+ "file:" + System.getProperty("java.io.tmpdir") +
+ File.separator + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+ expectedLocation,
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
@@ -121,10 +157,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ val expectedLocation =
+ "file:" + System.getProperty("java.io.tmpdir") +
+ File.separator + s"$dbNameWithoutBackTicks.db"
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+ expectedLocation,
Map.empty))
val message = intercept[AnalysisException] {
@@ -148,7 +187,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
+ "file:" + System.getProperty("java.io.tmpdir") +
+ File.separator + s"$dbNameWithoutBackTicks.db"
+
sql(s"CREATE DATABASE $dbName")
checkAnswer(
@@ -210,6 +251,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// TODO: test drop database in restrict mode
+ test("create table in default db") {
+ val catalog = sqlContext.sessionState.catalog
+ val tableIdent1 = TableIdentifier("tab1", None)
+ createTable(catalog, tableIdent1)
+ val expectedTableIdent = tableIdent1.copy(database = Some("default"))
+ val expectedLocation =
+ catalog.getDatabaseMetadata("default").locationUri + "/tab1"
+ val expectedStorage =
+ CatalogStorageFormat(
+ locationUri = Some(expectedLocation),
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map())
+ val expectedTable =
+ CatalogTable(
+ identifier = expectedTableIdent,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = expectedStorage,
+ schema = Seq(),
+ createTime = 0L)
+ assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ }
+
+ test("create table in a specific db") {
+ val catalog = sqlContext.sessionState.catalog
+ createDatabase(catalog, "dbx")
+ val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
+ createTable(catalog, tableIdent1)
+ val expectedLocation =
+ catalog.getDatabaseMetadata("dbx").locationUri + "/tab1"
+ val expectedStorage =
+ CatalogStorageFormat(
+ locationUri = Some(expectedLocation),
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ serdeProperties = Map())
+ val expectedTable =
+ CatalogTable(
+ identifier = tableIdent1,
+ tableType = CatalogTableType.EXTERNAL,
+ storage = expectedStorage,
+ schema = Seq(),
+ createTime = 0L)
+ assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+ }
+
test("alter table: rename") {
val catalog = sqlContext.sessionState.catalog
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
@@ -534,7 +623,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 456587e0e0..f023edbd96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
@@ -44,8 +45,14 @@ private[sql] class HiveSessionCatalog(
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: SQLConf)
- extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
+ conf: SQLConf,
+ hadoopConf: Configuration)
+ extends SessionCatalog(
+ externalCatalog,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ hadoopConf) {
override def setCurrentDatabase(db: String): Unit = {
super.setCurrentDatabase(db)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 57aa4b2931..31f28f205f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -54,7 +54,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
sparkSession,
functionResourceLoader,
functionRegistry,
- conf)
+ conf,
+ newHadoopConf())
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index be89edbad7..d033b05d48 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -379,7 +379,7 @@ private[spark] object HiveUtils extends Logging {
propMap.put(confvar.varname, confvar.getDefaultExpr())
}
}
- propMap.put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, localMetastore.toURI.toString)
+ propMap.put(SQLConf.WAREHOUSE_PATH.key, localMetastore.toURI.toString)
propMap.put(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
s"jdbc:derby:${withInMemoryMode};databaseName=${localMetastore.getAbsolutePath};create=true")
propMap.put("datanucleus.rdbms.datastoreAdapterClassName",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 78ba2bfda6..cdfadfaaea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -179,7 +179,7 @@ private[hive] class HiveClientImpl(
// Log the default warehouse location.
logInfo(
- s"Default warehouse location for Hive client " +
+ s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
/** Returns the configuration for the current session. */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b41d882ffa..42746ece3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -115,6 +115,12 @@ private[hive] class TestHiveSparkSession(
@transient private val existingSharedState: Option[TestHiveSharedState])
extends SparkSession(sc) with Logging { self =>
+ // TODO: We need to set the temp warehouse path to sc's conf.
+ // Right now, In SparkSession, we will set the warehouse path to the default one
+ // instead of the temp one. Then, we override the setting in TestHiveSharedState
+ // when we creating metadataHive. This flow is not easy to follow and can introduce
+ // confusion when a developer is debugging an issue. We need to refactor this part
+ // to just set the temp warehouse path in sc's conf.
def this(sc: SparkContext) {
this(
sc,
@@ -573,6 +579,8 @@ private[hive] object TestHiveContext {
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map(
+ // Override WAREHOUSE_PATH and METASTOREWAREHOUSE to use the given path.
+ SQLConf.WAREHOUSE_PATH.key -> warehousePath.toURI.toString,
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index cc05e1d1d7..77a6a94a67 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -31,9 +31,9 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
-import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -202,6 +202,19 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("set spark.sql.warehouse.dir") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
+ "--name", "SetWarehouseLocationTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -262,6 +275,62 @@ class HiveSparkSubmitSuite
}
}
+object SetWarehouseLocationTest extends Logging {
+ def main(args: Array[String]): Unit = {
+ Utils.configTestLog4j("INFO")
+ val warehouseLocation = Utils.createTempDir()
+ warehouseLocation.delete()
+ val hiveWarehouseLocation = Utils.createTempDir()
+ hiveWarehouseLocation.delete()
+
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ // We will use the value of spark.sql.warehouse.dir override the
+ // value of hive.metastore.warehouse.dir.
+ conf.set("spark.sql.warehouse.dir", warehouseLocation.toString)
+ conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
+
+ val sc = new SparkContext(conf)
+ val sparkSession = SparkSession.withHiveSupport(sc)
+ val catalog = sparkSession.sessionState.catalog
+
+ sparkSession.sql("drop table if exists testLocation")
+ sparkSession.sql("drop database if exists testLocationDB cascade")
+
+ {
+ sparkSession.sql("create table testLocation (a int)")
+ val tableMetadata =
+ catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
+ val expectedLocation =
+ "file:" + warehouseLocation.toString + "/testlocation"
+ val actualLocation = tableMetadata.storage.locationUri.get
+ if (actualLocation != expectedLocation) {
+ throw new Exception(
+ s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
+ }
+ sparkSession.sql("drop table testLocation")
+ }
+
+ {
+ sparkSession.sql("create database testLocationDB")
+ sparkSession.sql("use testLocationDB")
+ sparkSession.sql("create table testLocation (a int)")
+ val tableMetadata =
+ catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
+ val expectedLocation =
+ "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation"
+ val actualLocation = tableMetadata.storage.locationUri.get
+ if (actualLocation != expectedLocation) {
+ throw new Exception(
+ s"Expected table location is $expectedLocation. But, it is actually $actualLocation")
+ }
+ sparkSession.sql("drop table testLocation")
+ sparkSession.sql("use default")
+ sparkSession.sql("drop database testLocationDB")
+ }
+ }
+}
+
// This application is used to test defining a new Hive UDF (with an associated jar)
// and use this UDF. We need to run this test in separate JVM to make sure we
// can load the jar defined with the function.