diff options
author | Andrew Or <andrew@databricks.com> | 2016-02-27 19:51:28 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-27 19:51:28 -0800 |
commit | cca79fad66c4315b0ed6de59fd87700a540e6646 (patch) | |
tree | ca305c4e716f0138e1130bc7d6ab3a5bcf3928cf | |
parent | 4c5e968db23db41c0bea802819ebd75fad63bc2b (diff) | |
download | spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.gz spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.bz2 spark-cca79fad66c4315b0ed6de59fd87700a540e6646.zip |
[SPARK-13526][SQL] Move SQLContext per-session states to new class
## What changes were proposed in this pull request?
This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts.
## How was this patch tested?
Existing unit tests; this patch introduces no change in behavior.
Author: Andrew Or <andrew@databricks.com>
Closes #11405 from andrewor14/refactor-session.
10 files changed, 294 insertions, 164 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 165280a1b2..9ce37fc753 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -271,13 +271,18 @@ object MimaExcludes { ) ++ Seq( // SPARK-13220 Deprecate yarn-client and yarn-cluster mode ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler"), + "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler") + ) ++ Seq( // SPARK-13465 TaskContext. ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.addTaskFailureListener") ) ++ Seq ( // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) ++ Seq( + // SPARK-13526 Move SQLContext per-session states to new class + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "org.apache.spark.sql.UDFRegistration.this") + ) ++ Seq( // [SPARK-13486][SQL] Move SQLConf into an internal package ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c24d9e4ae..cb4a6397b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,13 +25,12 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql.{execution => sparkexecution} -import org.apache.spark.sql.catalyst.{InternalRow, _} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ @@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ @@ -68,7 +66,7 @@ class SQLContext private[sql]( @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, val isRootContext: Boolean) - extends org.apache.spark.Logging with Serializable { + extends Logging with Serializable { self => @@ -115,9 +113,27 @@ class SQLContext private[sql]( } /** - * @return Spark SQL configuration + * Per-session state, e.g. configuration, functions, temporary tables etc. */ - protected[sql] lazy val conf = new SQLConf + @transient + protected[sql] lazy val sessionState: SessionState = new SessionState(self) + protected[sql] def conf: SQLConf = sessionState.conf + protected[sql] def catalog: Catalog = sessionState.catalog + protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry + protected[sql] def analyzer: Analyzer = sessionState.analyzer + protected[sql] def optimizer: Optimizer = sessionState.optimizer + protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser + protected[sql] def planner: SparkPlanner = sessionState.planner + protected[sql] def continuousQueryManager = sessionState.continuousQueryManager + protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] = + sessionState.prepareForExecution + + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + */ + @Experimental + def listenerManager: ExecutionListenerManager = sessionState.listenerManager /** * Set Spark SQL configuration properties. @@ -179,43 +195,11 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - @transient - lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager - - protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this) - - @transient - protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf) - - @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() - - @transient - protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - datasources.PreWriteCheck(catalog) - ) - } - - @transient - protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this) - - @transient - protected[sql] val sqlParser: ParserInterface = new SparkQl(conf) - protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql) - protected[sql] def executeSql(sql: String): - org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql)) + protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = - new sparkexecution.QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) /** * Add a jar to SQLContext @@ -299,10 +283,8 @@ class SQLContext private[sql]( * * @group basic * @since 1.3.0 - * TODO move to SQLSession? */ - @transient - val udf: UDFRegistration = new UDFRegistration(this) + def udf: UDFRegistration = sessionState.udf /** * Returns true if the table is currently cached in-memory. @@ -873,25 +855,9 @@ class SQLContext private[sql]( } @transient - protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this) - - @transient protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1) /** - * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal - * row format conversions as needed. - */ - @transient - protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { - val batches = Seq( - Batch("Subquery", Once, PlanSubqueries(self)), - Batch("Add exchange", Once, EnsureRequirements(self)), - Batch("Whole stage codegen", Once, CollapseCodegenStages(self)) - ) - } - - /** * Parses the data type in our internal string representation. The data type string should * have the same format as the one generated by `toString` in scala. * It is only used by PySpark. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index de01cbcb0e..d894825632 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -22,6 +22,7 @@ import scala.util.Try import org.apache.spark.Logging import org.apache.spark.sql.api.java._ +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF} import org.apache.spark.sql.execution.aggregate.ScalaUDAF @@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType * * @since 1.3.0 */ -class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { - - private val functionRegistry = sqlContext.functionRegistry +class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging { protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = { log.debug( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala new file mode 100644 index 0000000000..f93a405f77 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration} +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.exchange.EnsureRequirements +import org.apache.spark.sql.util.ExecutionListenerManager + + +/** + * A class that holds all session-specific state in a given [[SQLContext]]. + */ +private[sql] class SessionState(ctx: SQLContext) { + + // Note: These are all lazy vals because they depend on each other (e.g. conf) and we + // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. + + /** + * SQL-specific key-value configurations. + */ + lazy val conf = new SQLConf + + /** + * Internal catalog for managing table and database states. + */ + lazy val catalog: Catalog = new SimpleCatalog(conf) + + /** + * Internal catalog for managing functions registered by the user. + */ + lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + + /** + * Interface exposed to the user for registering user-defined functions. + */ + lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry) + + /** + * Logical query plan analyzer for resolving unresolved attributes and relations. + */ + lazy val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { + override val extendedResolutionRules = + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) + } + } + + /** + * Logical query plan optimizer. + */ + lazy val optimizer: Optimizer = new SparkOptimizer(ctx) + + /** + * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. + */ + lazy val sqlParser: ParserInterface = new SparkQl(conf) + + /** + * Planner that converts optimized logical plans to physical plans. + */ + lazy val planner: SparkPlanner = new SparkPlanner(ctx) + + /** + * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal + * row format conversions as needed. + */ + lazy val prepareForExecution = new RuleExecutor[SparkPlan] { + override val batches: Seq[Batch] = Seq( + Batch("Subquery", Once, PlanSubqueries(ctx)), + Batch("Add exchange", Once, EnsureRequirements(ctx)), + Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx)) + ) + } + + /** + * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s + * that listen for execution metrics. + */ + lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager + + /** + * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. + */ + lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 28ad7ae64a..b3e146fba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.test import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SessionState, SQLConf} /** * A special [[SQLContext]] prepared for testing. @@ -31,16 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel new SparkConf().set("spark.sql.testkey", "true"))) } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - - clear() - - override def clear(): Unit = { - super.clear() - - // Make sure we start with the default test configs even after clear - TestSQLContext.overrideConfs.foreach { - case (key, value) => setConfString(key, value) + @transient + protected[sql] override lazy val sessionState: SessionState = new SessionState(self) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def clear(): Unit = { + super.clear() + // Make sure we start with the default test configs even after clear + TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) } + } } } } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d15f883138..0dc2a95eea 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Enable in-memory partition pruning for testing purposes TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Use Hive hash expression instead of the native one - TestHive.functionRegistry.unregisterFunction("hash") + TestHive.sessionState.functionRegistry.unregisterFunction("hash") RuleExecutor.resetTime() } @@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.functionRegistry.restore() + TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d511dd685c..a9295d31c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._ import org.apache.spark.sql.types._ @@ -110,6 +108,16 @@ class HiveContext private[hive]( isRootContext = false) } + @transient + protected[sql] override lazy val sessionState = new HiveSessionState(self) + + protected[sql] override def catalog = sessionState.catalog + + // The Hive UDF current_database() is foldable, will be evaluated by optimizer, + // but the optimizer can't access the SessionState of metadataHive. + sessionState.functionRegistry.registerFunction( + "current_database", (e: Seq[Expression]) => new CurrentDatabase(self)) + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -442,39 +450,6 @@ class HiveContext private[hive]( setConf(entry.key, entry.stringConverter(value)) } - /* A catalyst metadata catalog that points to the Hive Metastore. */ - @transient - override protected[sql] lazy val catalog = - new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog - - // Note that HiveUDFs will be overridden by functions registered in this context. - @transient - override protected[sql] lazy val functionRegistry: FunctionRegistry = - new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive) - - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer - // can't access the SessionState of metadataHive. - functionRegistry.registerFunction( - "current_database", - (expressions: Seq[Expression]) => new CurrentDatabase(this)) - - /* An analyzer that uses the Hive metastore. */ - @transient - override protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.CreateTables :: - catalog.PreInsertionCasts :: - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - PreWriteCheck(catalog) - ) - } - /** Overridden by child classes that need to set configuration before the client init. */ protected def configure(): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch @@ -544,37 +519,6 @@ class HiveContext private[hive]( c } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - } - - @transient - protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf) - - @transient - private val hivePlanner = new SparkPlanner(this) with HiveStrategies { - val hiveContext = self - - override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( - DataSourceStrategy, - HiveCommandStrategy(self), - HiveDDLStrategy, - DDLStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - DataSinks, - Scripts, - Aggregation, - LeftSemiJoin, - EquiJoinSelection, - BasicOperators, - BroadcastNestedLoop, - CartesianProduct, - DefaultJoin - ) - } - private def functionOrMacroDDLPattern(command: String) = Pattern.compile( ".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command) @@ -590,9 +534,6 @@ class HiveContext private[hive]( } } - @transient - override protected[sql] val planner = hivePlanner - /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala new file mode 100644 index 0000000000..09f54be04d --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.{python, SparkPlanner} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.{SessionState, SQLConf} + + +/** + * A class that holds all session-specific state in a given [[HiveContext]]. + */ +private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { + + override lazy val conf: SQLConf = new SQLConf { + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + } + + /** + * A metadata catalog that points to the Hive metastore. + */ + override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog + + /** + * Internal catalog for managing functions registered by the user. + * Note that HiveUDFs will be overridden by functions registered in this context. + */ + override lazy val functionRegistry: FunctionRegistry = { + new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) + } + + /** + * An analyzer that uses the Hive metastore. + */ + override lazy val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { + override val extendedResolutionRules = + catalog.ParquetConversions :: + catalog.CreateTables :: + catalog.PreInsertionCasts :: + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(PreWriteCheck(catalog)) + } + } + + /** + * Parser for HiveQl query texts. + */ + override lazy val sqlParser: ParserInterface = new HiveQl(conf) + + /** + * Planner that takes into account Hive-specific strategies. + */ + override lazy val planner: SparkPlanner = { + new SparkPlanner(ctx) with HiveStrategies { + override val hiveContext = ctx + + override def strategies: Seq[Strategy] = { + ctx.experimental.extraStrategies ++ Seq( + DataSourceStrategy, + HiveCommandStrategy(ctx), + HiveDDLStrategy, + DDLStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + DataSinks, + Scripts, + Aggregation, + LeftSemiJoin, + EquiJoinSelection, + BasicOperators, + BroadcastNestedLoop, + CartesianProduct, + DefaultJoin + ) + } + } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 9d0622bea9..a7eca46d19 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -120,18 +120,25 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - - clear() - - override def clear(): Unit = { - super.clear() - - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) + @transient + protected[sql] override lazy val sessionState = new HiveSessionState(this) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + override def clear(): Unit = { + super.clear() + TestHiveContext.overrideConfs.map { + case (key, value) => setConfString(key, value) + } + } } } + + override lazy val functionRegistry = { + new TestHiveFunctionRegistry( + org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive) + } } /** @@ -454,9 +461,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { } } - @transient - override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry( - org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive) } private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a127cf6e4b..d81c566822 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { + val _catalog = catalog + def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") @@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) @@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ |