aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala44
1 files changed, 30 insertions, 14 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fad4091d48..ff8fa44194 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
/**
- * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
- * HiveConf will get picked up by the SQLConf. Additionally, any properties set by
- * set() or a SET command inside sql() will be set in the SQLConf *as well as*
- * in the HiveConf.
+ * SQLConf and HiveConf contracts:
+ *
+ * 1. reuse existing started SessionState if any
+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
+ * set in the SQLConf *as well as* in the HiveConf.
*/
- @transient lazy val hiveconf = new HiveConf(classOf[SessionState])
- @transient protected[hive] lazy val sessionState = {
- val ss = new SessionState(hiveconf)
- setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
- SessionState.start(ss)
- ss.err = new PrintStream(outputBuffer, true, "UTF-8")
- ss.out = new PrintStream(outputBuffer, true, "UTF-8")
-
- ss
- }
+ @transient protected[hive] lazy val (hiveconf, sessionState) =
+ Option(SessionState.get())
+ .orElse {
+ val newState = new SessionState(new HiveConf(classOf[SessionState]))
+ // Only starts newly created `SessionState` instance. Any existing `SessionState` instance
+ // returned by `SessionState.get()` must be the most recently started one.
+ SessionState.start(newState)
+ Some(newState)
+ }
+ .map { state =>
+ setConf(state.getConf.getAllProperties)
+ if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ (state.getConf, state)
+ }
+ .get
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
+ // Makes sure the session represented by the `sessionState` field is activated. This implies
+ // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
+ // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
+ // TODO Fix session isolation
+ if (SessionState.get() != sessionState) {
+ SessionState.start(sessionState)
+ }
+
proc match {
case driver: Driver =>
val results = HiveShim.createDriverResultsArray