diff options
author | Yin Huai <yhuai@databricks.com> | 2016-04-29 17:07:15 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-29 17:07:15 -0700 |
commit | b33d6b72886db35ea042e29a8c08cd73bf9d4b0c (patch) | |
tree | a0b9f44f670c2c0d841a5bfea69c2129e42e4bb1 /sql | |
parent | dcfaeadea7e0013af98de626dec36306325f73e7 (diff) | |
download | spark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.tar.gz spark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.tar.bz2 spark-b33d6b72886db35ea042e29a8c08cd73bf9d4b0c.zip |
[SPARK-15019][SQL] Propagate all Spark Confs to HiveConf created in HiveClientImpl
## What changes were proposed in this pull request?
This PR makes two changes:
1. We will propagate Spark Confs to HiveConf created in HiveClientImpl. So, users can also use spark conf to set warehouse location and metastore url.
2. In sql/hive, HiveClientImpl will be the only place where we create a new HiveConf.
## How was this patch tested?
Existing tests.
Author: Yin Huai <yhuai@databricks.com>
Closes #12791 from yhuai/onlyUseHiveConfInHiveClientImpl.
Diffstat (limited to 'sql')
7 files changed, 52 insertions, 40 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a8561192ed..be89edbad7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -178,7 +178,7 @@ private[spark] object HiveUtils extends Logging { /** * Configurations needed to create a [[HiveClient]]. */ - private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = { + private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- // compatibility when users are trying to connecting to a Hive metastore of lower version, @@ -227,7 +227,7 @@ private[spark] object HiveUtils extends Logging { ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS ).map { case (confVar, unit) => - confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString + confVar.varname -> HiveConf.getTimeVar(hadoopConf, confVar, unit).toString }.toMap } @@ -264,14 +264,12 @@ private[spark] object HiveUtils extends Logging { protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) - val configurations = hiveClientConfigurations(hiveConf) - newClientForMetadata(conf, hiveConf, hadoopConf, configurations) + val configurations = hiveClientConfigurations(hadoopConf) + newClientForMetadata(conf, hadoopConf, configurations) } protected[hive] def newClientForMetadata( conf: SparkConf, - hiveConf: HiveConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf @@ -282,12 +280,6 @@ private[spark] object HiveUtils extends Logging { val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) - val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir") - logInfo("default warehouse location is " + defaultWarehouseLocation) - - // `configure` goes second to override other settings. - val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations - val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (hiveExecutionVersion != hiveMetastoreVersion) { throw new IllegalArgumentException( @@ -321,7 +313,7 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, - config = allConfig, + config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) @@ -334,7 +326,7 @@ private[spark] object HiveUtils extends Logging { hadoopVersion = VersionInfo.getVersion, sparkConf = conf, hadoopConf = hadoopConf, - config = allConfig, + config = configurations, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else { @@ -364,7 +356,7 @@ private[spark] object HiveUtils extends Logging { sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, - config = allConfig, + config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index df6abc258b..d044811052 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -105,7 +105,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHadoopConf + val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -119,7 +119,7 @@ class HadoopTableReader( val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value + val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c98eaa0d15..78ba2bfda6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -56,9 +56,17 @@ import org.apache.spark.util.{CircularBuffer, Utils} * the 'native', execution version of Hive. Therefore, any places where hive breaks compatibility * must use reflection after matching on `version`. * + * Every HiveClientImpl creates an internal HiveConf object. This object is using the given + * `hadoopConf` as the base. All options set in the `sparkConf` will be applied to the HiveConf + * object and overrides any exiting options. Then, options in extraConfig will be applied + * to the HiveConf object and overrides any existing options. + * * @param version the version of hive used when pick function calls that are not compatible. - * @param config a collection of configuration options that will be added to the hive conf before - * opening the hive client. + * @param sparkConf all configuration options set in SparkConf. + * @param hadoopConf the base Configuration object used by the HiveConf created inside + * this HiveClientImpl. + * @param extraConfig a collection of configuration options that will be added to the + * hive conf before opening the hive client. * @param initClassLoader the classloader used when creating the `state` field of * this [[HiveClientImpl]]. */ @@ -66,7 +74,7 @@ private[hive] class HiveClientImpl( override val version: HiveVersion, sparkConf: SparkConf, hadoopConf: Configuration, - config: Map[String, String], + extraConfig: Map[String, String], initClassLoader: ClassLoader, val clientLoader: IsolatedClientLoader) extends HiveClient @@ -129,22 +137,32 @@ private[hive] class HiveClientImpl( // so we should keep `conf` and reuse the existing instance of `CliSessionState`. originalState } else { - val initialConf = new HiveConf(hadoopConf, classOf[SessionState]) + val hiveConf = new HiveConf(hadoopConf, classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader // (i.e. initClassLoader at here). // We call initialConf.setClassLoader(initClassLoader) at here to make // this action explicit. - initialConf.setClassLoader(initClassLoader) - config.foreach { case (k, v) => + hiveConf.setClassLoader(initClassLoader) + // First, we set all spark confs to this hiveConf. + sparkConf.getAll.foreach { case (k, v) => + if (k.toLowerCase.contains("password")) { + logDebug(s"Applying Spark config to Hive Conf: $k=xxx") + } else { + logDebug(s"Applying Spark config to Hive Conf: $k=$v") + } + hiveConf.set(k, v) + } + // Second, we set all entries in config to this hiveConf. + extraConfig.foreach { case (k, v) => if (k.toLowerCase.contains("password")) { - logDebug(s"Hive Config: $k=xxx") + logDebug(s"Applying extra config to HiveConf: $k=xxx") } else { - logDebug(s"Hive Config: $k=$v") + logDebug(s"Applying extra config to HiveConf: $k=$v") } - initialConf.set(k, v) + hiveConf.set(k, v) } - val state = new SessionState(initialConf) + val state = new SessionState(hiveConf) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } @@ -159,10 +177,13 @@ private[hive] class HiveClientImpl( ret } + // Log the default warehouse location. + logInfo( + s"Default warehouse location for Hive client " + + s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + /** Returns the configuration for the current session. */ - // TODO: We should not use it because HiveSessionState has a hiveconf - // for the current Session. - def conf: HiveConf = SessionState.get().getConf + def conf: HiveConf = state.getConf override def getConf(key: String, defaultValue: String): String = { conf.get(key, defaultValue) @@ -212,7 +233,7 @@ private[hive] class HiveClientImpl( false } - def client: Hive = { + private def client: Hive = { if (clientLoader.cachedHive != null) { clientLoader.cachedHive.asInstanceOf[Hive] } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b52b96a804..e29864f996 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -73,7 +73,7 @@ case class HiveTableScanExec( BindReferences.bindReference(pred, relation.partitionKeys) } - // Create a local copy of hiveconf,so that scan specific modifications should not impact + // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries @transient private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index e763b63380..93646a45a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -561,23 +561,21 @@ private[hive] object TestHiveContext { warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): HiveClient = { - val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) HiveUtils.newClientForMetadata( conf, - hiveConf, hadoopConf, - hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) + hiveClientConfigurations(hadoopConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) } /** * Configurations needed to create a [[HiveClient]]. */ def hiveClientConfigurations( - hiveconf: HiveConf, + hadoopConf: Configuration, warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]): Map[String, String] = { - HiveUtils.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map( + HiveUtils.hiveClientConfigurations(hadoopConf) ++ metastoreTemporaryConf ++ Map( ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 46de4921f6..baf34d1cf0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -111,7 +111,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("SPARK-4203:random partition directory order") { sql("CREATE TABLE tmp_table (key int, value string)") val tmpDir = Utils.createTempDir() - val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + // The default value of hive.exec.stagingdir. + val stagingDir = ".hive-staging" sql( s""" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index d78914505a..af4dc1beec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.internal.SQLConf class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** * Set the staging directory (and hence path to ignore Parquet files under) - * to that set by [[HiveConf.ConfVars.STAGINGDIR]]. + * to the default value of hive.exec.stagingdir. */ - private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) + private val stagingDir = ".hive-staging" override protected def logParquetSchema(path: String): Unit = { val schema = readParquetSchema(path, { path => |