aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-03-17 01:09:27 +0800
committerCheng Lian <lian@databricks.com>2015-03-17 01:09:27 +0800
commit12a345adcbaee359199ddfed4f41bf0e19d66d48 (patch)
tree3e1a93a7952024af0c14945efd577c79461231c2 /sql/core
parent00e730b94cba1202a73af1e2476ff5a44af4b6b2 (diff)
downloadspark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.gz
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.tar.bz2
spark-12a345adcbaee359199ddfed4f41bf0e19d66d48.zip
[SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance
Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states. Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility. Author: Cheng Hao <hao.cheng@intel.com> Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits: 1c47b2a [Cheng Hao] rename the tss => tlSession 815b27a [Cheng Hao] code style issue 57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1 4665b0d [Cheng Hao] thriftservice with single context
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala17
2 files changed, 54 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9c49e84bf9..297d0d644a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -63,8 +63,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
- // Note that this is a lazy val so we can override the default value in subclasses.
- protected[sql] lazy val conf: SQLConf = new SQLConf
+ /**
+ * @return Spark SQL configuration
+ */
+ protected[sql] def conf = tlSession.get().conf
/**
* Set Spark SQL configuration properties.
@@ -103,9 +105,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+ // TODO how to handle the temp table per user session?
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
+ // TODO how to handle the temp function per user session?
@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)
@@ -138,6 +142,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
+ @transient
+ protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
+ override def initialValue = defaultSession
+ }
+
+ @transient
+ protected[sql] val defaultSession = createSession()
+
sparkContext.getConf.getAll.foreach {
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
case _ =>
@@ -194,6 +206,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* }}}
*
* @group basic
+ * TODO move to SQLSession?
*/
@transient
val udf: UDFRegistration = new UDFRegistration(this)
@@ -1059,6 +1072,32 @@ class SQLContext(@transient val sparkContext: SparkContext)
)
}
+
+ protected[sql] def openSession(): SQLSession = {
+ detachSession()
+ val session = createSession()
+ tlSession.set(session)
+
+ session
+ }
+
+ protected[sql] def currentSession(): SQLSession = {
+ tlSession.get()
+ }
+
+ protected[sql] def createSession(): SQLSession = {
+ new this.SQLSession()
+ }
+
+ protected[sql] def detachSession(): Unit = {
+ tlSession.remove()
+ }
+
+ protected[sql] class SQLSession {
+ // Note that this is a lazy val so we can override the default value in subclasses.
+ protected[sql] lazy val conf: SQLConf = new SQLConf
+ }
+
/**
* :: DeveloperApi ::
* The primary workflow for executing relational queries using Spark. Designed to allow easy
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 4e1ec38bd0..356a6100d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -24,16 +24,22 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/** A SQLContext that can be used for local testing. */
-object TestSQLContext
+class LocalSQLContext
extends SQLContext(
new SparkContext(
"local[2]",
"TestSQLContext",
new SparkConf().set("spark.sql.testkey", "true"))) {
- /** Fewer partitions to speed up testing. */
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
- override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+ override protected[sql] def createSession(): SQLSession = {
+ new this.SQLSession()
+ }
+
+ protected[sql] class SQLSession extends super.SQLSession {
+ protected[sql] override lazy val conf: SQLConf = new SQLConf {
+ /** Fewer partitions to speed up testing. */
+ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+ }
}
/**
@@ -45,3 +51,6 @@ object TestSQLContext
}
}
+
+object TestSQLContext extends LocalSQLContext
+