From ad0fde10b2285e780349be5a8f333db0974a502f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 1 Nov 2014 15:03:11 -0700 Subject: [SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2 `HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems. This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`). Author: Cheng Lian Closes #2887 from liancheng/spark-4037 and squashes the following commits: 8446675 [Cheng Lian] Removes redundant Driver initialization a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times 49b1c5b [Cheng Lian] Reuses existing started SessionState if any 3cd6fab [Cheng Lian] Fixes SPARK-4037 --- .../org/apache/spark/sql/hive/HiveContext.scala | 44 +++++++++++++++------- 1 file changed, 30 insertions(+), 14 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index fad4091d48..ff8fa44194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } /** - * SQLConf and HiveConf contracts: when the hive session is first initialized, params in - * HiveConf will get picked up by the SQLConf. Additionally, any properties set by - * set() or a SET command inside sql() will be set in the SQLConf *as well as* - * in the HiveConf. + * SQLConf and HiveConf contracts: + * + * 1. reuse existing started SessionState if any + * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the + * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be + * set in the SQLConf *as well as* in the HiveConf. */ - @transient lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = { - val ss = new SessionState(hiveconf) - setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. - SessionState.start(ss) - ss.err = new PrintStream(outputBuffer, true, "UTF-8") - ss.out = new PrintStream(outputBuffer, true, "UTF-8") - - ss - } + @transient protected[hive] lazy val (hiveconf, sessionState) = + Option(SessionState.get()) + .orElse { + val newState = new SessionState(new HiveConf(classOf[SessionState])) + // Only starts newly created `SessionState` instance. Any existing `SessionState` instance + // returned by `SessionState.get()` must be the most recently started one. + SessionState.start(newState) + Some(newState) + } + .map { state => + setConf(state.getConf.getAllProperties) + if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") + if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") + (state.getConf, state) + } + .get override def setConf(key: String, value: String): Unit = { super.setConf(key, value) @@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) + // Makes sure the session represented by the `sessionState` field is activated. This implies + // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks + // session isolation under multi-user scenarios (i.e. HiveThriftServer2). + // TODO Fix session isolation + if (SessionState.get() != sessionState) { + SessionState.start(sessionState) + } + proc match { case driver: Driver => val results = HiveShim.createDriverResultsArray -- cgit v1.2.3