aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-15 11:50:54 -0700
committerReynold Xin <rxin@databricks.com>2016-06-15 11:50:54 -0700
commite1585cc74853c497271eecdc943c0eabe1aeb4c1 (patch)
tree0cb8c511b9dc26fecca4b5a833298e551562dc01
parent9a5071996b968148f6b9aba12e0d3fe888d9acd8 (diff)
downloadspark-e1585cc74853c497271eecdc943c0eabe1aeb4c1.tar.gz
spark-e1585cc74853c497271eecdc943c0eabe1aeb4c1.tar.bz2
spark-e1585cc74853c497271eecdc943c0eabe1aeb4c1.zip
[SPARK-15959][SQL] Add the support of hive.metastore.warehouse.dir back
## What changes were proposed in this pull request? This PR adds the support of conf `hive.metastore.warehouse.dir` back. With this patch, the way of setting the warehouse dir is described as follows: * If `spark.sql.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the value of `spark.sql.warehouse.dir`. * If `spark.sql.warehouse.dir` is not set but `hive.metastore.warehouse.dir` is set, `spark.sql.warehouse.dir` will be automatically set to the value of `hive.metastore.warehouse.dir`. The warehouse dir is effectively set to the value of `hive.metastore.warehouse.dir`. * If neither `spark.sql.warehouse.dir` nor `hive.metastore.warehouse.dir` is set, `hive.metastore.warehouse.dir` will be automatically set to the default value of `spark.sql.warehouse.dir`. The warehouse dir is effectively set to the default value of `spark.sql.warehouse.dir`. ## How was this patch tested? `set hive.metastore.warehouse.dir` in `HiveSparkSubmitSuite`. JIRA: https://issues.apache.org/jira/browse/SPARK-15959 Author: Yin Huai <yhuai@databricks.com> Closes #13679 from yhuai/hiveWarehouseDir.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala91
3 files changed, 106 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index c37f7f12ac..bc349b4f28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
@@ -30,7 +31,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*/
-private[sql] class SharedState(val sparkContext: SparkContext) {
+private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* Class for caching query results reused in future executions.
@@ -46,7 +47,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
* The base hadoop configuration which is shared among all spark sessions. It is based on the
* default hadoop configuration of Spark, with custom configurations inside `hive-site.xml`.
*/
- lazy val hadoopConf: Configuration = {
+ val hadoopConf: Configuration = {
val conf = new Configuration(sparkContext.hadoopConfiguration)
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
@@ -66,6 +67,30 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
val jarClassLoader = new NonClosableMutableURLClassLoader(
org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+ {
+ // Set the Hive metastore warehouse path to the one we use
+ val tempConf = new SQLConf
+ sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
+ val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
+ if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) {
+ // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
+ // we will respect the value of hive.metastore.warehouse.dir.
+ tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
+ sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir)
+ logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
+ s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " +
+ s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
+ } else {
+ // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
+ // the value of spark.sql.warehouse.dir.
+ // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
+ // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
+ sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
+ }
+
+ logInfo(s"Warehouse path is '${tempConf.warehousePath}'.")
+ }
+
/**
* Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI.
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index 78b1ecbbea..6b7a333f2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -18,9 +18,8 @@
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.internal.SharedState
/**
@@ -28,18 +27,10 @@ import org.apache.spark.sql.internal.{SharedState, SQLConf}
* [[org.apache.spark.sql.SparkSession]] backed by Hive.
*/
private[hive] class HiveSharedState(override val sparkContext: SparkContext)
- extends SharedState(sparkContext) with Logging {
+ extends SharedState(sparkContext) {
// TODO: just share the IsolatedClientLoader instead of the client instance itself
- {
- // Set the Hive metastore warehouse path to the one we use
- val tempConf = new SQLConf
- sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) }
- sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath)
- logInfo(s"Setting Hive metastore warehouse path to '${tempConf.warehousePath}'")
- }
-
/**
* A Hive client used to interact with the metastore.
*/
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d56bede0cc..9bca720a94 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
-import java.io.File
+import java.io.{BufferedWriter, File, FileWriter}
import java.sql.Timestamp
import java.util.Date
@@ -205,7 +205,7 @@ class HiveSparkSubmitSuite
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
- "--name", "SetWarehouseLocationTest",
+ "--name", "SetSparkWarehouseLocationTest",
"--master", "local-cluster[2,1,1024]",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
@@ -214,6 +214,45 @@ class HiveSparkSubmitSuite
runSparkSubmit(args)
}
+ test("set hive.metastore.warehouse.dir") {
+ // In this test, we set hive.metastore.warehouse.dir in hive-site.xml but
+ // not set spark.sql.warehouse.dir. So, the warehouse dir should be
+ // the value of hive.metastore.warehouse.dir. Also, the value of
+ // spark.sql.warehouse.dir should be set to the value of hive.metastore.warehouse.dir.
+
+ val hiveWarehouseLocation = Utils.createTempDir()
+ hiveWarehouseLocation.delete()
+ val hiveSiteXmlContent =
+ s"""
+ |<configuration>
+ | <property>
+ | <name>hive.metastore.warehouse.dir</name>
+ | <value>$hiveWarehouseLocation</value>
+ | </property>
+ |</configuration>
+ """.stripMargin
+
+ // Write a hive-site.xml containing a setting of hive.metastore.warehouse.dir.
+ val hiveSiteDir = Utils.createTempDir()
+ val file = new File(hiveSiteDir.getCanonicalPath, "hive-site.xml")
+ val bw = new BufferedWriter(new FileWriter(file))
+ bw.write(hiveSiteXmlContent)
+ bw.close()
+
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SetWarehouseLocationTest.getClass.getName.stripSuffix("$"),
+ "--name", "SetHiveWarehouseLocationTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--conf", s"spark.sql.test.expectedWarehouseDir=$hiveWarehouseLocation",
+ "--conf", s"spark.driver.extraClassPath=${hiveSiteDir.getCanonicalPath}",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -277,19 +316,43 @@ class HiveSparkSubmitSuite
object SetWarehouseLocationTest extends Logging {
def main(args: Array[String]): Unit = {
Utils.configTestLog4j("INFO")
- val warehouseLocation = Utils.createTempDir()
- warehouseLocation.delete()
- val hiveWarehouseLocation = Utils.createTempDir()
- hiveWarehouseLocation.delete()
- // We will use the value of spark.sql.warehouse.dir override the
- // value of hive.metastore.warehouse.dir.
- val sparkSession = SparkSession.builder()
+ val sparkConf = new SparkConf(loadDefaults = true)
+ val builder = SparkSession.builder()
+ .config(sparkConf)
.config("spark.ui.enabled", "false")
- .config("spark.sql.warehouse.dir", warehouseLocation.toString)
- .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
.enableHiveSupport()
- .getOrCreate()
+ val providedExpectedWarehouseLocation =
+ sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
+
+ val (sparkSession, expectedWarehouseLocation) = providedExpectedWarehouseLocation match {
+ case Some(warehouseDir) =>
+ // If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set
+ // through spark-summit. So, neither spark.sql.warehouse.dir nor
+ // hive.metastore.warehouse.dir is set at here.
+ (builder.getOrCreate(), warehouseDir)
+ case None =>
+ val warehouseLocation = Utils.createTempDir()
+ warehouseLocation.delete()
+ val hiveWarehouseLocation = Utils.createTempDir()
+ hiveWarehouseLocation.delete()
+ // If spark.sql.test.expectedWarehouseDir is not set, we will set
+ // spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
+ // We are expecting that the value of spark.sql.warehouse.dir will override the
+ // value of hive.metastore.warehouse.dir.
+ val session = builder
+ .config("spark.sql.warehouse.dir", warehouseLocation.toString)
+ .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
+ .getOrCreate()
+ (session, warehouseLocation.toString)
+
+ }
+
+ if (sparkSession.conf.get("spark.sql.warehouse.dir") != expectedWarehouseLocation) {
+ throw new Exception(
+ "spark.sql.warehouse.dir is not set to the expected warehouse location " +
+ s"$expectedWarehouseLocation.")
+ }
val catalog = sparkSession.sessionState.catalog
@@ -301,7 +364,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("default")))
val expectedLocation =
- "file:" + warehouseLocation.toString + "/testlocation"
+ "file:" + expectedWarehouseLocation.toString + "/testlocation"
val actualLocation = tableMetadata.storage.locationUri.get
if (actualLocation != expectedLocation) {
throw new Exception(
@@ -317,7 +380,7 @@ object SetWarehouseLocationTest extends Logging {
val tableMetadata =
catalog.getTableMetadata(TableIdentifier("testLocation", Some("testLocationDB")))
val expectedLocation =
- "file:" + warehouseLocation.toString + "/testlocationdb.db/testlocation"
+ "file:" + expectedWarehouseLocation.toString + "/testlocationdb.db/testlocation"
val actualLocation = tableMetadata.storage.locationUri.get
if (actualLocation != expectedLocation) {
throw new Exception(