From cca79fad66c4315b0ed6de59fd87700a540e6646 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 27 Feb 2016 19:51:28 -0800 Subject: [SPARK-13526][SQL] Move SQLContext per-session states to new class ## What changes were proposed in this pull request? This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts. ## How was this patch tested? Existing unit tests; this patch introduces no change in behavior. Author: Andrew Or Closes #11405 from andrewor14/refactor-session. --- .../hive/execution/HiveCompatibilitySuite.scala | 4 +- .../org/apache/spark/sql/hive/HiveContext.scala | 81 +++------------- .../apache/spark/sql/hive/HiveSessionState.scala | 103 +++++++++++++++++++++ .../org/apache/spark/sql/hive/test/TestHive.scala | 30 +++--- .../org/apache/spark/sql/hive/parquetSuites.scala | 7 +- 5 files changed, 137 insertions(+), 88 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala (limited to 'sql/hive') diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index d15f883138..0dc2a95eea 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Enable in-memory partition pruning for testing purposes TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Use Hive hash expression instead of the native one - TestHive.functionRegistry.unregisterFunction("hash") + TestHive.sessionState.functionRegistry.unregisterFunction("hash") RuleExecutor.resetTime() } @@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.functionRegistry.restore() + TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d511dd685c..a9295d31c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._ import org.apache.spark.sql.types._ @@ -110,6 +108,16 @@ class HiveContext private[hive]( isRootContext = false) } + @transient + protected[sql] override lazy val sessionState = new HiveSessionState(self) + + protected[sql] override def catalog = sessionState.catalog + + // The Hive UDF current_database() is foldable, will be evaluated by optimizer, + // but the optimizer can't access the SessionState of metadataHive. + sessionState.functionRegistry.registerFunction( + "current_database", (e: Seq[Expression]) => new CurrentDatabase(self)) + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -442,39 +450,6 @@ class HiveContext private[hive]( setConf(entry.key, entry.stringConverter(value)) } - /* A catalyst metadata catalog that points to the Hive Metastore. */ - @transient - override protected[sql] lazy val catalog = - new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog - - // Note that HiveUDFs will be overridden by functions registered in this context. - @transient - override protected[sql] lazy val functionRegistry: FunctionRegistry = - new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive) - - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer - // can't access the SessionState of metadataHive. - functionRegistry.registerFunction( - "current_database", - (expressions: Seq[Expression]) => new CurrentDatabase(this)) - - /* An analyzer that uses the Hive metastore. */ - @transient - override protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, conf) { - override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.CreateTables :: - catalog.PreInsertionCasts :: - python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) - - override val extendedCheckRules = Seq( - PreWriteCheck(catalog) - ) - } - /** Overridden by child classes that need to set configuration before the client init. */ protected def configure(): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch @@ -544,37 +519,6 @@ class HiveContext private[hive]( c } - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - } - - @transient - protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf) - - @transient - private val hivePlanner = new SparkPlanner(this) with HiveStrategies { - val hiveContext = self - - override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( - DataSourceStrategy, - HiveCommandStrategy(self), - HiveDDLStrategy, - DDLStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - DataSinks, - Scripts, - Aggregation, - LeftSemiJoin, - EquiJoinSelection, - BasicOperators, - BroadcastNestedLoop, - CartesianProduct, - DefaultJoin - ) - } - private def functionOrMacroDDLPattern(command: String) = Pattern.compile( ".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command) @@ -590,9 +534,6 @@ class HiveContext private[hive]( } } - @transient - override protected[sql] val planner = hivePlanner - /** Extends QueryExecution with hive specific features. */ protected[sql] class QueryExecution(logicalPlan: LogicalPlan) extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala new file mode 100644 index 0000000000..09f54be04d --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.ParserInterface +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.{python, SparkPlanner} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.{SessionState, SQLConf} + + +/** + * A class that holds all session-specific state in a given [[HiveContext]]. + */ +private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { + + override lazy val conf: SQLConf = new SQLConf { + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + } + + /** + * A metadata catalog that points to the Hive metastore. + */ + override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog + + /** + * Internal catalog for managing functions registered by the user. + * Note that HiveUDFs will be overridden by functions registered in this context. + */ + override lazy val functionRegistry: FunctionRegistry = { + new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive) + } + + /** + * An analyzer that uses the Hive metastore. + */ + override lazy val analyzer: Analyzer = { + new Analyzer(catalog, functionRegistry, conf) { + override val extendedResolutionRules = + catalog.ParquetConversions :: + catalog.CreateTables :: + catalog.PreInsertionCasts :: + python.ExtractPythonUDFs :: + PreInsertCastAndRename :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + + override val extendedCheckRules = Seq(PreWriteCheck(catalog)) + } + } + + /** + * Parser for HiveQl query texts. + */ + override lazy val sqlParser: ParserInterface = new HiveQl(conf) + + /** + * Planner that takes into account Hive-specific strategies. + */ + override lazy val planner: SparkPlanner = { + new SparkPlanner(ctx) with HiveStrategies { + override val hiveContext = ctx + + override def strategies: Seq[Strategy] = { + ctx.experimental.extraStrategies ++ Seq( + DataSourceStrategy, + HiveCommandStrategy(ctx), + HiveDDLStrategy, + DDLStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + DataSinks, + Scripts, + Aggregation, + LeftSemiJoin, + EquiJoinSelection, + BasicOperators, + BroadcastNestedLoop, + CartesianProduct, + DefaultJoin + ) + } + } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 9d0622bea9..a7eca46d19 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -120,18 +120,25 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) - protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - - clear() - - override def clear(): Unit = { - super.clear() - - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) + @transient + protected[sql] override lazy val sessionState = new HiveSessionState(this) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + override def clear(): Unit = { + super.clear() + TestHiveContext.overrideConfs.map { + case (key, value) => setConfString(key, value) + } + } } } + + override lazy val functionRegistry = { + new TestHiveFunctionRegistry( + org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive) + } } /** @@ -454,9 +461,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { } } - @transient - override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry( - org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive) } private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a127cf6e4b..d81c566822 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = { + val _catalog = catalog + def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") @@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") // First, make sure the converted test_parquet is not cached. assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) @@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") + tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) sql( """ -- cgit v1.2.3