aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-29 17:07:15 -0700
committerAndrew Or <andrew@databricks.com>2016-04-29 17:07:15 -0700
commitb33d6b72886db35ea042e29a8c08cd73bf9d4b0c (patch)
treea0b9f44f670c2c0d841a5bfea69c2129e42e4bb1 /sql
parentdcfaeadea7e0013af98de626dec36306325f73e7 (diff)
downloadspark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.tar.gz
spark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.tar.bz2
spark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.zip
[SPARK-15019][SQL] Propagate all Spark Confs to HiveConf created in HiveClientImpl
## What changes were proposed in this pull request? This PR makes two changes: 1. We will propagate Spark Confs to HiveConf created in HiveClientImpl. So, users can also use spark conf to set warehouse location and metastore url. 2. In sql/hive, HiveClientImpl will be the only place where we create a new HiveConf. ## How was this patch tested? Existing tests. Author: Yin Huai <yhuai@databricks.com> Closes #12791 from yhuai/onlyUseHiveConfInHiveClientImpl.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala49
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala4
7 files changed, 52 insertions, 40 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index a8561192ed..be89edbad7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -178,7 +178,7 @@ private[spark] object HiveUtils extends Logging {
/**
* Configurations needed to create a [[HiveClient]].
*/
- private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = {
+ private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
// compatibility when users are trying to connecting to a Hive metastore of lower version,
@@ -227,7 +227,7 @@ private[spark] object HiveUtils extends Logging {
ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS,
ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS
).map { case (confVar, unit) =>
- confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString
+ confVar.varname -> HiveConf.getTimeVar(hadoopConf, confVar, unit).toString
}.toMap
}
@@ -264,14 +264,12 @@ private[spark] object HiveUtils extends Logging {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
- val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
- val configurations = hiveClientConfigurations(hiveConf)
- newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
+ val configurations = hiveClientConfigurations(hadoopConf)
+ newClientForMetadata(conf, hadoopConf, configurations)
}
protected[hive] def newClientForMetadata(
conf: SparkConf,
- hiveConf: HiveConf,
hadoopConf: Configuration,
configurations: Map[String, String]): HiveClient = {
val sqlConf = new SQLConf
@@ -282,12 +280,6 @@ private[spark] object HiveUtils extends Logging {
val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
- val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir")
- logInfo("default warehouse location is " + defaultWarehouseLocation)
-
- // `configure` goes second to override other settings.
- val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations
-
val isolatedLoader = if (hiveMetastoreJars == "builtin") {
if (hiveExecutionVersion != hiveMetastoreVersion) {
throw new IllegalArgumentException(
@@ -321,7 +313,7 @@ private[spark] object HiveUtils extends Logging {
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
- config = allConfig,
+ config = configurations,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -334,7 +326,7 @@ private[spark] object HiveUtils extends Logging {
hadoopVersion = VersionInfo.getVersion,
sparkConf = conf,
hadoopConf = hadoopConf,
- config = allConfig,
+ config = configurations,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else {
@@ -364,7 +356,7 @@ private[spark] object HiveUtils extends Logging {
sparkConf = conf,
hadoopConf = hadoopConf,
execJars = jars.toSeq,
- config = allConfig,
+ config = configurations,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index df6abc258b..d044811052 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -105,7 +105,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val tableDesc = relation.tableDesc
- val broadcastedHiveConf = _broadcastedHadoopConf
+ val broadcastedHadoopConf = _broadcastedHadoopConf
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -119,7 +119,7 @@ class HadoopTableReader(
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
- val hconf = broadcastedHiveConf.value.value
+ val hconf = broadcastedHadoopConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c98eaa0d15..78ba2bfda6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -56,9 +56,17 @@ import org.apache.spark.util.{CircularBuffer, Utils}
* the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility
* must use reflection after matching on `version`.
*
+ * Every HiveClientImpl creates an internal HiveConf object. This object is using the given
+ * `hadoopConf` as the base. All options set in the `sparkConf` will be applied to the HiveConf
+ * object and overrides any exiting options. Then, options in extraConfig will be applied
+ * to the HiveConf object and overrides any existing options.
+ *
* @param version the version of hive used when pick function calls that are not compatible.
- * @param config a collection of configuration options that will be added to the hive conf before
- * opening the hive client.
+ * @param sparkConf all configuration options set in SparkConf.
+ * @param hadoopConf the base Configuration object used by the HiveConf created inside
+ * this HiveClientImpl.
+ * @param extraConfig a collection of configuration options that will be added to the
+ * hive conf before opening the hive client.
* @param initClassLoader the classloader used when creating the `state` field of
* this [[HiveClientImpl]].
*/
@@ -66,7 +74,7 @@ private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
hadoopConf: Configuration,
- config: Map[String, String],
+ extraConfig: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
extends HiveClient
@@ -129,22 +137,32 @@ private[hive] class HiveClientImpl(
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
- val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
+ val hiveConf = new HiveConf(hadoopConf, classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
- initialConf.setClassLoader(initClassLoader)
- config.foreach { case (k, v) =>
+ hiveConf.setClassLoader(initClassLoader)
+ // First, we set all spark confs to this hiveConf.
+ sparkConf.getAll.foreach { case (k, v) =>
+ if (k.toLowerCase.contains("password")) {
+ logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
+ } else {
+ logDebug(s"Applying Spark config to Hive Conf: $k=$v")
+ }
+ hiveConf.set(k, v)
+ }
+ // Second, we set all entries in config to this hiveConf.
+ extraConfig.foreach { case (k, v) =>
if (k.toLowerCase.contains("password")) {
- logDebug(s"Hive Config: $k=xxx")
+ logDebug(s"Applying extra config to HiveConf: $k=xxx")
} else {
- logDebug(s"Hive Config: $k=$v")
+ logDebug(s"Applying extra config to HiveConf: $k=$v")
}
- initialConf.set(k, v)
+ hiveConf.set(k, v)
}
- val state = new SessionState(initialConf)
+ val state = new SessionState(hiveConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
@@ -159,10 +177,13 @@ private[hive] class HiveClientImpl(
ret
}
+ // Log the default warehouse location.
+ logInfo(
+ s"Default warehouse location for Hive client " +
+ s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
+
/** Returns the configuration for the current session. */
- // TODO: We should not use it because HiveSessionState has a hiveconf
- // for the current Session.
- def conf: HiveConf = SessionState.get().getConf
+ def conf: HiveConf = state.getConf
override def getConf(key: String, defaultValue: String): String = {
conf.get(key, defaultValue)
@@ -212,7 +233,7 @@ private[hive] class HiveClientImpl(
false
}
- def client: Hive = {
+ private def client: Hive = {
if (clientLoader.cachedHive != null) {
clientLoader.cachedHive.asInstanceOf[Hive]
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index b52b96a804..e29864f996 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -73,7 +73,7 @@ case class HiveTableScanExec(
BindReferences.bindReference(pred, relation.partitionKeys)
}
- // Create a local copy of hiveconf,so that scan specific modifications should not impact
+ // Create a local copy of hadoopConf,so that scan specific modifications should not impact
// other queries
@transient
private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index e763b63380..93646a45a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -561,23 +561,21 @@ private[hive] object TestHiveContext {
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): HiveClient = {
- val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
HiveUtils.newClientForMetadata(
conf,
- hiveConf,
hadoopConf,
- hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf))
+ hiveClientConfigurations(hadoopConf, warehousePath, scratchDirPath, metastoreTemporaryConf))
}
/**
* Configurations needed to create a [[HiveClient]].
*/
def hiveClientConfigurations(
- hiveconf: HiveConf,
+ hadoopConf: Configuration,
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]): Map[String, String] = {
- HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map(
+ HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map(
ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString,
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 46de4921f6..baf34d1cf0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -111,7 +111,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
test("SPARK-4203:random partition directory order") {
sql("CREATE TABLE tmp_table (key int, value string)")
val tmpDir = Utils.createTempDir()
- val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+ // The default value of hive.exec.stagingdir.
+ val stagingDir = ".hive-staging"
sql(
s"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index d78914505a..af4dc1beec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.internal.SQLConf
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton {
/**
* Set the staging directory (and hence path to ignore Parquet files under)
- * to that set by [[HiveConf.ConfVars.STAGINGDIR]].
+ * to the default value of hive.exec.stagingdir.
*/
- private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+ private val stagingDir = ".hive-staging"
override protected def logParquetSchema(path: String): Unit = {
val schema = readParquetSchema(path, { path =>