diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 44 |
1 files changed, 30 insertions, 14 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index fad4091d48..ff8fa44194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } /** - * SQLConf and HiveConf contracts: when the hive session is first initialized, params in - * HiveConf will get picked up by the SQLConf. Additionally, any properties set by - * set() or a SET command inside sql() will be set in the SQLConf *as well as* - * in the HiveConf. + * SQLConf and HiveConf contracts: + * + * 1. reuse existing started SessionState if any + * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the + * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be + * set in the SQLConf *as well as* in the HiveConf. */ - @transient lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = { - val ss = new SessionState(hiveconf) - setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. - SessionState.start(ss) - ss.err = new PrintStream(outputBuffer, true, "UTF-8") - ss.out = new PrintStream(outputBuffer, true, "UTF-8") - - ss - } + @transient protected[hive] lazy val (hiveconf, sessionState) = + Option(SessionState.get()) + .orElse { + val newState = new SessionState(new HiveConf(classOf[SessionState])) + // Only starts newly created `SessionState` instance. Any existing `SessionState` instance + // returned by `SessionState.get()` must be the most recently started one. + SessionState.start(newState) + Some(newState) + } + .map { state => + setConf(state.getConf.getAllProperties) + if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") + if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") + (state.getConf, state) + } + .get override def setConf(key: String, value: String): Unit = { super.setConf(key, value) @@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) + // Makes sure the session represented by the `sessionState` field is activated. This implies + // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks + // session isolation under multi-user scenarios (i.e. HiveThriftServer2). + // TODO Fix session isolation + if (SessionState.get() != sessionState) { + SessionState.start(sessionState) + } + proc match { case driver: Driver => val results = HiveShim.createDriverResultsArray |