aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-05-17 12:43:15 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-17 12:43:15 -0700
commit2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c (patch)
tree8f365657d8032ac2e3671487ff87e2feece260ac /sql/hive
parent564562874f589c4c8bcabcd9d6eb9a6b0eada938 (diff)
downloadspark-2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c.tar.gz
spark-2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c.tar.bz2
spark-2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c.zip
[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive
Author: Michael Armbrust <michael@databricks.com> Closes #6167 from marmbrus/configureIsolation and squashes the following commits: 6147cbe [Michael Armbrust] filter other conf 22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into configureIsolation 07476ee [Michael Armbrust] filter empty prefixes dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader isolation for hive
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala33
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala9
3 files changed, 46 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9d98c36e94..2733ebdb95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] def hiveMetastoreJars: String =
getConf(HIVE_METASTORE_JARS, "builtin")
+ /**
+ * A comma separated list of class prefixes that should be loaded using the classloader that
+ * is shared between Spark SQL and a specific version of Hive. An example of classes that should
+ * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
+ * to be shared are those that interact with classes that are already shared. For example,
+ * custom appenders that are used by log4j.
+ */
+ protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
+ getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
+ .split(",").filterNot(_ == "")
+
+ private def jdbcPrefixes = Seq(
+ "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
+
+ /**
+ * A comma separated list of class prefixes that should explicitly be reloaded for each version
+ * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
+ * prefix that typically would be shared (i.e. org.apache.spark.*)
+ */
+ protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
+ getConf("spark.sql.hive.metastore.barrierPrefixes", "")
+ .split(",").filterNot(_ == "")
+
@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
- isolationOn = true)
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
// TODO: Support for loading the jars from an already downloaded location.
logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
- IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
+ IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
} else {
// Convert to files and expand any directories.
val jars =
@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
version = metaVersion,
execJars = jars.toSeq,
config = allConfig,
- isolationOn = true)
+ isolationOn = true,
+ barrierPrefixes = hiveMetastoreBarrierPrefixes,
+ sharedPrefixes = hiveMetastoreSharedPrefixes)
}
isolatedLoader.client
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 7f94c93ba4..196a3d836c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
"com.google.guava:guava:14.0.1" :+
- "org.apache.hadoop:hadoop-client:2.4.0" :+
- "mysql:mysql-connector-java:5.1.12"
+ "org.apache.hadoop:hadoop-client:2.4.0"
val classpath = quietly {
SparkSubmitUtils.resolveMavenCoordinates(
@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
- val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
+ val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
+ val sharedPrefixes: Seq[String] = Seq.empty,
+ val barrierPrefixes: Seq[String] = Seq.empty)
extends Logging {
// Check to make sure that the root classloader does not know about Hive.
@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
name.startsWith("scala.") ||
name.startsWith("com.google") ||
name.startsWith("java.lang.") ||
- name.startsWith("java.net")
+ name.startsWith("java.net") ||
+ sharedPrefixes.exists(name.startsWith)
/** True if `name` refers to a spark class that must see specific version of Hive. */
protected def isBarrierClass(name: String): Boolean =
- name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
name.startsWith(classOf[ClientWrapper].getName) ||
- name.startsWith(classOf[ReflectionMagic].getName)
+ name.startsWith(classOf[ReflectionMagic].getName) ||
+ barrierPrefixes.exists(name.startsWith)
protected def classToPath(name: String): String =
name.replaceAll("\\.", "/") + ".class"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1598d4bd47..9648284074 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
// SPARK-3729: Test key required to check for initialization errors with config.
object TestHive
extends TestHiveContext(
- new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", "")))
+ new SparkContext(
+ "local[2]",
+ "TestSQLContext",
+ new SparkConf()
+ .set("spark.sql.test", "")
+ .set(
+ "spark.sql.hive.metastore.barrierPrefixes",
+ "org.apache.spark.sql.hive.execution.PairSerDe")))
/**
* A locally running test instance of Spark's Hive execution engine.