aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-01 15:03:11 -0700
committerMichael Armbrust <michael@databricks.com>2014-11-01 15:03:11 -0700
commitad0fde10b2285e780349be5a8f333db0974a502f (patch)
tree4b870113fdb6d47ae4405edd3b8bd5d65d933e49 /sql/hive
parentf55218aeb1e9d638df6229b36a59a15ce5363482 (diff)
downloadspark-ad0fde10b2285e780349be5a8f333db0974a502f.tar.gz
spark-ad0fde10b2285e780349be5a8f333db0974a502f.tar.bz2
spark-ad0fde10b2285e780349be5a8f333db0974a502f.zip
[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2
`HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems. This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`). Author: Cheng Lian <lian@databricks.com> Closes #2887 from liancheng/spark-4037 and squashes the following commits: 8446675 [Cheng Lian] Removes redundant Driver initialization a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times 49b1c5b [Cheng Lian] Reuses existing started SessionState if any 3cd6fab [Cheng Lian] Fixes SPARK-4037
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala44
1 files changed, 30 insertions, 14 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fad4091d48..ff8fa44194 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
/**
- * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
- * HiveConf will get picked up by the SQLConf. Additionally, any properties set by
- * set() or a SET command inside sql() will be set in the SQLConf *as well as*
- * in the HiveConf.
+ * SQLConf and HiveConf contracts:
+ *
+ * 1. reuse existing started SessionState if any
+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
+ * set in the SQLConf *as well as* in the HiveConf.
*/
- @transient lazy val hiveconf = new HiveConf(classOf[SessionState])
- @transient protected[hive] lazy val sessionState = {
- val ss = new SessionState(hiveconf)
- setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
- SessionState.start(ss)
- ss.err = new PrintStream(outputBuffer, true, "UTF-8")
- ss.out = new PrintStream(outputBuffer, true, "UTF-8")
-
- ss
- }
+ @transient protected[hive] lazy val (hiveconf, sessionState) =
+ Option(SessionState.get())
+ .orElse {
+ val newState = new SessionState(new HiveConf(classOf[SessionState]))
+ // Only starts newly created `SessionState` instance. Any existing `SessionState` instance
+ // returned by `SessionState.get()` must be the most recently started one.
+ SessionState.start(newState)
+ Some(newState)
+ }
+ .map { state =>
+ setConf(state.getConf.getAllProperties)
+ if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ (state.getConf, state)
+ }
+ .get
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
+ // Makes sure the session represented by the `sessionState` field is activated. This implies
+ // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
+ // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
+ // TODO Fix session isolation
+ if (SessionState.get() != sessionState) {
+ SessionState.start(sessionState)
+ }
+
proc match {
case driver: Driver =>
val results = HiveShim.createDriverResultsArray