aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala17
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala18
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala44
4 files changed, 44 insertions, 45 deletions
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 3d468d8046..bd4e99492b 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,11 +17,8 @@
package org.apache.spark.sql.hive.thriftserver
-import scala.collection.JavaConversions._
-
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
@@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging {
def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
-
if (!optionsProcessor.process(args)) {
System.exit(-1)
}
- val ss = new SessionState(new HiveConf(classOf[SessionState]))
-
- // Set all properties specified via command line.
- val hiveConf: HiveConf = ss.getConf
- hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
- logDebug(s"HiveConf var: $k=$v")
- }
-
- SessionState.start(ss)
-
logInfo("Starting SparkContext")
SparkSQLEnv.init()
- SessionState.start(ss)
Runtime.getRuntime.addShutdownHook(
new Thread() {
@@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging {
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
- server.init(hiveConf)
+ server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
} catch {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2136a2ea63..5042586351 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -17,12 +17,10 @@
package org.apache.spark.sql.hive.thriftserver
-import org.apache.hadoop.hive.ql.session.SessionState
-
-import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
-import org.apache.spark.Logging
+import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import scala.collection.JavaConversions._
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
@@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
sparkContext.addSparkListener(new StatsReportListener())
+ hiveContext = new HiveContext(sparkContext)
- hiveContext = new HiveContext(sparkContext) {
- @transient override lazy val sessionState = {
- val state = SessionState.get()
- setConf(state.getConf.getAllProperties)
- state
+ if (log.isDebugEnabled) {
+ hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
+ logDebug(s"HiveConf var: $k=$v")
}
- @transient override lazy val hiveconf = sessionState.getConf
}
}
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index e3b4e45a3d..c60e8fa5b1 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
- val queries = Seq(
- "CREATE TABLE test(key INT, val STRING)",
- s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
- "CACHE TABLE test")
+ val queries =
+ s"""SET spark.sql.shuffle.partitions=3;
+ |CREATE TABLE test(key INT, val STRING);
+ |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
+ |CACHE TABLE test;
+ """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
queries.foreach(statement.execute)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fad4091d48..ff8fa44194 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
/**
- * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
- * HiveConf will get picked up by the SQLConf. Additionally, any properties set by
- * set() or a SET command inside sql() will be set in the SQLConf *as well as*
- * in the HiveConf.
+ * SQLConf and HiveConf contracts:
+ *
+ * 1. reuse existing started SessionState if any
+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
+ * set in the SQLConf *as well as* in the HiveConf.
*/
- @transient lazy val hiveconf = new HiveConf(classOf[SessionState])
- @transient protected[hive] lazy val sessionState = {
- val ss = new SessionState(hiveconf)
- setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
- SessionState.start(ss)
- ss.err = new PrintStream(outputBuffer, true, "UTF-8")
- ss.out = new PrintStream(outputBuffer, true, "UTF-8")
-
- ss
- }
+ @transient protected[hive] lazy val (hiveconf, sessionState) =
+ Option(SessionState.get())
+ .orElse {
+ val newState = new SessionState(new HiveConf(classOf[SessionState]))
+ // Only starts newly created `SessionState` instance. Any existing `SessionState` instance
+ // returned by `SessionState.get()` must be the most recently started one.
+ SessionState.start(newState)
+ Some(newState)
+ }
+ .map { state =>
+ setConf(state.getConf.getAllProperties)
+ if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ (state.getConf, state)
+ }
+ .get
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
+ // Makes sure the session represented by the `sessionState` field is activated. This implies
+ // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
+ // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
+ // TODO Fix session isolation
+ if (SessionState.get() != sessionState) {
+ SessionState.start(sessionState)
+ }
+
proc match {
case driver: Driver =>
val results = HiveShim.createDriverResultsArray