aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-03-16 22:57:06 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 22:57:06 -0700
commit5faba9faccb5ce43790c43284769e0f890340606 (patch)
treef8cf4a677a7ed02cb7fb41f9df23e468e82735a5
parentde1a84e56e81347cb0d1ec67cc86944ea98bb9a9 (diff)
downloadspark-5faba9faccb5ce43790c43284769e0f890340606.tar.gz
spark-5faba9faccb5ce43790c43284769e0f890340606.tar.bz2
spark-5faba9faccb5ce43790c43284769e0f890340606.zip
[SPARK-13403][SQL] Pass hadoopConfiguration to HiveConf constructors.
This commit updates the HiveContext so that sc.hadoopConfiguration is used to instantiate its internal instances of HiveConf. I tested this by overriding the S3 FileSystem implementation from spark-defaults.conf as "spark.hadoop.fs.s3.impl" (to avoid [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810)). Author: Ryan Blue <blue@apache.org> Closes #11273 from rdblue/SPARK-13403-new-hive-conf-from-hadoop-conf.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala17
5 files changed, 34 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 05fc569588..4238ad1ad4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -210,6 +210,7 @@ class HiveContext private[hive](
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
sparkConf = sc.conf,
execJars = Seq(),
+ hadoopConf = sc.hadoopConfiguration,
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
baseClassLoader = Utils.getContextOrSparkClassLoader)
@@ -239,7 +240,7 @@ class HiveContext private[hive](
// We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
// into the isolated client loader
- val metadataConf = new HiveConf()
+ val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf])
val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir")
logInfo("default warehouse location is " + defaultWarehouseLocation)
@@ -279,6 +280,7 @@ class HiveContext private[hive](
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
+ hadoopConf = sc.hadoopConfiguration,
config = allConfig,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
@@ -291,6 +293,7 @@ class HiveContext private[hive](
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sc.conf,
+ hadoopConf = sc.hadoopConfiguration,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -320,6 +323,7 @@ class HiveContext private[hive](
version = metaVersion,
sparkConf = sc.conf,
execJars = jars.toSeq,
+ hadoopConf = sc.hadoopConfiguration,
config = allConfig,
isolationOn = true,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3040ec93f8..a5f0bbf678 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -22,6 +22,7 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
@@ -62,6 +63,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
private[hive] class HiveClientImpl(
override val version: HiveVersion,
sparkConf: SparkConf,
+ hadoopConf: Configuration,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
@@ -115,7 +117,7 @@ private[hive] class HiveClientImpl(
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
- val initialConf = new HiveConf(classOf[SessionState])
+ val initialConf = new HiveConf(hadoopConf, classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 024f4dfeba..932402a5f3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -26,6 +26,7 @@ import scala.language.reflectiveCalls
import scala.util.Try
import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
@@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging {
hiveMetastoreVersion: String,
hadoopVersion: String,
sparkConf: SparkConf,
+ hadoopConf: Configuration,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
@@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging {
hiveVersion(hiveMetastoreVersion),
sparkConf,
execJars = files,
+ hadoopConf = hadoopConf,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
@@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging {
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
val sparkConf: SparkConf,
+ val hadoopConf: Configuration,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
@@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
- return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
+ return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
@@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
- .newInstance(version, sparkConf, config, classLoader, this)
+ .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
index 0dc4fea22d..427f5747a0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.SparkConf
@@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
- sparkConf = new SparkConf()).createClient()
+ sparkConf = new SparkConf(),
+ hadoopConf = new Configuration()).createClient()
}
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3d54da11ad..f218ab80a7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client
import java.io.File
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -63,12 +64,26 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
+ hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
badClient.createDatabase(db, ignoreIfExists = true)
}
+ test("hadoop configuration preserved") {
+ val hadoopConf = new Configuration();
+ hadoopConf.set("test", "success")
+ val client = IsolatedClientLoader.forVersion(
+ hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sparkConf,
+ hadoopConf = hadoopConf,
+ config = buildConf(),
+ ivyPath = ivyPath).createClient()
+ assert("success" === client.getConf("test", null))
+ }
+
private def getNestedMessages(e: Throwable): String = {
var causes = ""
var lastException = e
@@ -98,6 +113,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
+ hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
}
@@ -118,6 +134,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
+ hadoopConf = new Configuration(),
config = buildConf(),
ivyPath = ivyPath).createClient()
}