From 12a345adcbaee359199ddfed4f41bf0e19d66d48 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 17 Mar 2015 01:09:27 +0800 Subject: [SPARK-2087] [SQL] Multiple thriftserver sessions with single HiveContext instance Still, we keep only a single HiveContext within ThriftServer, and we also create a object called `SQLSession` for isolating the different user states. Developers can obtain/release a new user session via `openSession` and `closeSession`, and `SQLContext` and `HiveContext` will also provide a default session if no `openSession` called, for backward-compatibility. Author: Cheng Hao Closes #4885 from chenghao-intel/multisessions_singlecontext and squashes the following commits: 1c47b2a [Cheng Hao] rename the tss => tlSession 815b27a [Cheng Hao] code style issue 57e3fa0 [Cheng Hao] openSession is not compatible between Hive0.12 & 0.13.1 4665b0d [Cheng Hao] thriftservice with single context --- .../scala/org/apache/spark/sql/SQLContext.scala | 43 +++++++++++++++++++++- .../org/apache/spark/sql/test/TestSQLContext.scala | 17 +++++++-- 2 files changed, 54 insertions(+), 6 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9c49e84bf9..297d0d644a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -63,8 +63,10 @@ class SQLContext(@transient val sparkContext: SparkContext) def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) - // Note that this is a lazy val so we can override the default value in subclasses. - protected[sql] lazy val conf: SQLConf = new SQLConf + /** + * @return Spark SQL configuration + */ + protected[sql] def conf = tlSession.get().conf /** * Set Spark SQL configuration properties. @@ -103,9 +105,11 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs + // TODO how to handle the temp table per user session? @transient protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true) + // TODO how to handle the temp function per user session? @transient protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true) @@ -138,6 +142,14 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan) + @transient + protected[sql] val tlSession = new ThreadLocal[SQLSession]() { + override def initialValue = defaultSession + } + + @transient + protected[sql] val defaultSession = createSession() + sparkContext.getConf.getAll.foreach { case (key, value) if key.startsWith("spark.sql") => setConf(key, value) case _ => @@ -194,6 +206,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * }}} * * @group basic + * TODO move to SQLSession? */ @transient val udf: UDFRegistration = new UDFRegistration(this) @@ -1059,6 +1072,32 @@ class SQLContext(@transient val sparkContext: SparkContext) ) } + + protected[sql] def openSession(): SQLSession = { + detachSession() + val session = createSession() + tlSession.set(session) + + session + } + + protected[sql] def currentSession(): SQLSession = { + tlSession.get() + } + + protected[sql] def createSession(): SQLSession = { + new this.SQLSession() + } + + protected[sql] def detachSession(): Unit = { + tlSession.remove() + } + + protected[sql] class SQLSession { + // Note that this is a lazy val so we can override the default value in subclasses. + protected[sql] lazy val conf: SQLConf = new SQLConf + } + /** * :: DeveloperApi :: * The primary workflow for executing relational queries using Spark. Designed to allow easy diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 4e1ec38bd0..356a6100d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -24,16 +24,22 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** A SQLContext that can be used for local testing. */ -object TestSQLContext +class LocalSQLContext extends SQLContext( new SparkContext( "local[2]", "TestSQLContext", new SparkConf().set("spark.sql.testkey", "true"))) { - /** Fewer partitions to speed up testing. */ - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + override protected[sql] def createSession(): SQLSession = { + new this.SQLSession() + } + + protected[sql] class SQLSession extends super.SQLSession { + protected[sql] override lazy val conf: SQLConf = new SQLConf { + /** Fewer partitions to speed up testing. */ + override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + } } /** @@ -45,3 +51,6 @@ object TestSQLContext } } + +object TestSQLContext extends LocalSQLContext + -- cgit v1.2.3