aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-06-17 14:52:43 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-17 14:52:43 -0700
commit302556ff999ba9a1960281de6932e0d904197204 (patch)
tree6849efa772cb6af48df6ba1e0fb221b3e2e79c0a
parent7f05b1fe696daa28fee514c9aef805be5913cfcd (diff)
downloadspark-302556ff999ba9a1960281de6932e0d904197204.tar.gz
spark-302556ff999ba9a1960281de6932e0d904197204.tar.bz2
spark-302556ff999ba9a1960281de6932e0d904197204.zip
[SPARK-8306] [SQL] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state.
https://issues.apache.org/jira/browse/SPARK-8306 I will try to add a test later. marmbrus aarondav Author: Yin Huai <yhuai@databricks.com> Closes #6758 from yhuai/SPARK-8306 and squashes the following commits: 1292346 [Yin Huai] [SPARK-8306] AddJar command needs to set the new class loader to the HiveConf inside executionHive.state.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala12
-rw-r--r--sql/hive/src/test/resources/hive-contrib-0.13.1.jarbin0 -> 114878 bytes
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala28
4 files changed, 45 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 0fcba65ca6..982ed63874 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -95,6 +95,7 @@ private[hive] class ClientWrapper(
case hive.v14 => new Shim_v0_14()
}
+ // Create an internal session state for this ClientWrapper.
val state = {
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
@@ -131,8 +132,15 @@ private[hive] class ClientWrapper(
*/
private def withHiveState[A](f: => A): A = synchronized {
val original = Thread.currentThread().getContextClassLoader
+ // This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not
+ // internally override the context class loader of the current thread with the class loader
+ // associated with the HiveConf in `state`.
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+ // Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
+ // Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated
+ // with the HiveConf in `state` to override the context class loader of the current
+ // thread.
shim.setCurrentSessionState(state)
val ret = try f finally {
Thread.currentThread().setContextClassLoader(original)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 195e5752c3..aad58bfa2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand {
val jarURL = new java.io.File(path).toURL
val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
Thread.currentThread.setContextClassLoader(newClassLoader)
- org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)
-
- // Add jar to isolated hive classloader
+ // We need to explicitly set the class loader associated with the conf in executionHive's
+ // state because this class loader will be used as the context class loader of the current
+ // thread to execute any Hive command.
+ // We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get()
+ // returns the value of a thread local variable and its HiveConf may not be the HiveConf
+ // associated with `executionHive.state` (for example, HiveContext is created in one thread
+ // and then add jar is called from another thread).
+ hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
+ // Add jar to isolated hive (metadataHive) class loader.
hiveContext.runSqlHive(s"ADD JAR $path")
// Add jar to executors
diff --git a/sql/hive/src/test/resources/hive-contrib-0.13.1.jar b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar
new file mode 100644
index 0000000000..ce0740d924
--- /dev/null
+++ b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar
Binary files differ
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f8908760cc..984d97d27b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -934,4 +934,32 @@ class SQLQuerySuite extends QueryTest {
sql("set hive.exec.dynamic.partition.mode=strict")
}
}
+
+ test("Call add jar in a different thread (SPARK-8306)") {
+ @volatile var error: Option[Throwable] = None
+ val thread = new Thread {
+ override def run() {
+ // To make sure this test works, this jar should not be loaded in another place.
+ TestHive.sql(
+ s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}")
+ try {
+ TestHive.sql(
+ """
+ |CREATE TEMPORARY FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ """.stripMargin)
+ } catch {
+ case throwable: Throwable =>
+ error = Some(throwable)
+ }
+ }
+ }
+ thread.start()
+ thread.join()
+ error match {
+ case Some(throwable) =>
+ fail("CREATE TEMPORARY FUNCTION should not fail.", throwable)
+ case None => // OK
+ }
+ }
}