diff options
author | Andrew Or <andrew@databricks.com> | 2016-02-27 19:51:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-27 19:51:28 -0800 |
commit | cca79fad66c4315b0ed6de59fd87700a540e6646 (patch) | |
tree | ca305c4e716f0138e1130bc7d6ab3a5bcf3928cf /sql/core/src | |
parent | 4c5e968db23db41c0bea802819ebd75fad63bc2b (diff) | |
download | spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.gz spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.bz2 spark-cca79fad66c4315b0ed6de59fd87700a540e6646.zip |
[SPARK-13526][SQL] Move SQLContext per-session states to new class
## What changes were proposed in this pull request?
This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts.
## How was this patch tested?
Existing unit tests; this patch introduces no change in behavior.
Author: Andrew Or <andrew@databricks.com>
Closes #11405 from andrewor14/refactor-session.
Diffstat (limited to 'sql/core/src')
4 files changed, 151 insertions, 75 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c24d9e4ae..cb4a6397b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,13 +25,12 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql.{execution => sparkexecution} -import org.apache.spark.sql.catalyst.{InternalRow, _} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ @@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ @@ -68,7 +66,7 @@ class SQLContext private[sql]( @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, val isRootContext: Boolean) - extends org.apache.spark.Logging with Serializable { + extends Logging with Serializable { self => @@ -115,9 +113,27 @@ class SQLContext private[sql]( } /** - * @return Spark SQL configuration + * Per-session state, e.g. configuration, functions, temporary tables etc. */ - protected[sql] lazy val conf = new SQLConf + @transient + protected[sql] lazy val sessionState: SessionState = new SessionState(self) + protected[sql] def conf: SQLConf = sessionState.conf + protected[sql] def catalog: Catalog = sessionState.catalog + protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] def analyzer: Analyzer = sessionState.analyzer + protected[sql] def optimizer: Optimizer = sessionState.optimizer + protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser + protected[sql] def planner: SparkPlanner = sessionState.planner + protected[sql] def continuousQueryManager = sessionState.continuousQueryManager + protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = + sessionState.prepareForExecution + + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + */ + @Experimental + def listenerManager: ExecutionListenerManager = sessionState.listenerManager /** * Set Spark SQL configuration properties. @@ -179,43 +195,11 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - @transient - lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager - - protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this) - - @transient - protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf) - - @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() - - @transient - protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - datasources.PreWriteCheck(catalog) - ) - } - - @transient - protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this) - - @transient - protected[sql] val sqlParser: ParserInterface = new SparkQl(conf) - protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql) - protected[sql] def executeSql(sql: String): - org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = - new sparkexecution.QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) /** * Add a jar to SQLContext @@ -299,10 +283,8 @@ class SQLContext private[sql]( * * @group basic * @since 1.3.0 - * TODO move to SQLSession? */ - @transient - val udf: UDFRegistration = new UDFRegistration(this) + def udf: UDFRegistration = sessionState.udf /** * Returns true if the table is currently cached in-memory. @@ -873,25 +855,9 @@ class SQLContext private[sql]( } @transient - protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this) - - @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1) /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal - * row format conversions as needed. - */ - @transient - protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = Seq( - Batch("Subquery", Once, PlanSubqueries(self)), - Batch("Add exchange", Once, EnsureRequirements(self)), - Batch("Whole stage codegen", Once, CollapseCodegenStages(self)) - ) - } - - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. * It is only used by PySpark. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index de01cbcb0e..d894825632 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -22,6 +22,7 @@ import scala.util.Try import org.apache.spark.Logging import org.apache.spark.sql.api.java._ +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF} import org.apache.spark.sql.execution.aggregate.ScalaUDAF @@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType * * @since 1.3.0 */ -class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { - - private val functionRegistry = sqlContext.functionRegistry +class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging { protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = { log.debug( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala new file mode 100644 index 0000000000..f93a405f77 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration} +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.util.ExecutionListenerManager + + +/** + * A class that holds all session-specific state in a given [[SQLContext]]. + */ +private[sql] class SessionState(ctx: SQLContext) { + + // Note: These are all lazy vals because they depend on each other (e.g. conf) and we + // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. + + /** + * SQL-specific key-value configurations. + */ + lazy val conf = new SQLConf + + /** + * Internal catalog for managing table and database states. + */ + lazy val catalog: Catalog = new SimpleCatalog(conf) + + /** + * Internal catalog for managing functions registered by the user. + */ + lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + + /** + * Interface exposed to the user for registering user-defined functions. + */ + lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry) + + /** + * Logical query plan analyzer for resolving unresolved attributes and relations. + */ + lazy val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { + override val extendedResolutionRules = + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) + } + } + + /** + * Logical query plan optimizer. + */ + lazy val optimizer: Optimizer = new SparkOptimizer(ctx) + + /** + * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + */ + lazy val sqlParser: ParserInterface = new SparkQl(conf) + + /** + * Planner that converts optimized logical plans to physical plans. + */ + lazy val planner: SparkPlanner = new SparkPlanner(ctx) + + /** + * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal + * row format conversions as needed. + */ + lazy val prepareForExecution = new RuleExecutor[SparkPlan] { + override val batches: Seq[Batch] = Seq( + Batch("Subquery", Once, PlanSubqueries(ctx)), + Batch("Add exchange", Once, EnsureRequirements(ctx)), + Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx)) + ) + } + + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + */ + lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + + /** + * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. + */ + lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 28ad7ae64a..b3e146fba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** * A special [[SQLContext]] prepared for testing. @@ -31,16 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - - clear() - - override def clear(): Unit = { - super.clear() - - // Make sure we start with the default test configs even after clear - TestSQLContext.overrideConfs.foreach { - case (key, value) => setConfString(key, value) + @transient + protected[sql] override lazy val sessionState: SessionState = new SessionState(self) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def clear(): Unit = { + super.clear() + // Make sure we start with the default test configs even after clear + TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) } + } } } } |