diff options
author | Herman van Hovell <hvanhovell@databricks.com> | 2017-03-28 10:07:24 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-03-28 10:07:24 +0800 |
commit | ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 (patch) | |
tree | f3014ba709d54b48172a399708074480a6ed9661 /sql/hive | |
parent | 8a6f33f0483dcee81467e6374a796b5dbd53ea30 (diff) | |
download | spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.gz spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.bz2 spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.zip |
[SPARK-20100][SQL] Refactor SessionState initialization
## What changes were proposed in this pull request?
The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.
This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:
1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive.
2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hvanhovell@databricks.com>
Closes #17433 from hvanhovell/SPARK-20100.
Diffstat (limited to 'sql/hive')
4 files changed, 152 insertions, 355 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 6b7599e3d3..2cc20a791d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} @@ -47,14 +47,16 @@ private[sql] class HiveSessionCatalog( functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, - parser: ParserInterface) + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) extends SessionCatalog( externalCatalog, globalTempViewManager, functionRegistry, conf, hadoopConf, - parser) { + parser, + functionResourceLoader) { // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | @@ -69,47 +71,6 @@ private[sql] class HiveSessionCatalog( metastoreCatalog.hiveDefaultTableFilePath(name) } - /** - * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and - * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied. - */ - def newSessionCatalogWith( - newSparkSession: SparkSession, - conf: SQLConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): HiveSessionCatalog = { - val catalog = HiveSessionCatalog( - newSparkSession, - functionRegistry, - conf, - hadoopConf, - parser) - - synchronized { - catalog.currentDb = currentDb - // copy over temporary tables - tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2)) - } - - catalog - } - - /** - * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add - * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However, - * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of - * `newSessionCatalogWith` and disable this one. - * - * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly. - */ - override def newSessionCatalogWith( - conf: CatalystConf, - hadoopConf: Configuration, - functionRegistry: FunctionRegistry, - parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException( - "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession") - // For testing only private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { val key = metastoreCatalog.getQualifiedTableName(table) @@ -250,28 +211,3 @@ private[sql] class HiveSessionCatalog( "histogram_numeric" ) } - -private[sql] object HiveSessionCatalog { - - def apply( - sparkSession: SparkSession, - functionRegistry: FunctionRegistry, - conf: SQLConf, - hadoopConf: Configuration, - parser: ParserInterface): HiveSessionCatalog = { - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is - // essentially a cache for metastore tables. However, it relies on a lot of session-specific - // things so it would be a lot of work to split its functionality between HiveSessionCatalog - // and HiveCatalog. We should still do it at some point... - val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) - - new HiveSessionCatalog( - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], - sparkSession.sharedState.globalTempViewManager, - metastoreCatalog, - functionRegistry, - conf, - hadoopConf, - parser) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index cb8bcb8591..49ff8478f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,20 +18,23 @@ package org.apache.spark.sql.hive import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf} import org.apache.spark.sql.streaming.StreamingQueryManager /** * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. + * * @param sparkContext The [[SparkContext]]. * @param sharedState The shared state. * @param conf SQL-specific key-value configurations. @@ -40,12 +43,14 @@ import org.apache.spark.sql.streaming.StreamingQueryManager * @param catalog Internal catalog for managing table and database states that uses Hive client for * interacting with the metastore. * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - * @param metadataHive The Hive metadata client. * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. - * @param streamingQueryManager Interface to start and stop - * [[org.apache.spark.sql.streaming.StreamingQuery]]s. - * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]] - * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies + * @param optimizer Logical query plan optimizer. + * @param planner Planner that converts optimized logical plans to physical plans and that takes + * Hive-specific strategies into account. + * @param streamingQueryManager Interface to start and stop streaming queries. + * @param createQueryExecution Function used to create QueryExecution objects. + * @param createClone Function used to create clones of the session state. + * @param metadataHive The Hive metadata client. */ private[hive] class HiveSessionState( sparkContext: SparkContext, @@ -55,11 +60,13 @@ private[hive] class HiveSessionState( functionRegistry: FunctionRegistry, override val catalog: HiveSessionCatalog, sqlParser: ParserInterface, - val metadataHive: HiveClient, analyzer: Analyzer, + optimizer: Optimizer, + planner: SparkPlanner, streamingQueryManager: StreamingQueryManager, - queryExecutionCreator: LogicalPlan => QueryExecution, - val plannerCreator: () => SparkPlanner) + createQueryExecution: LogicalPlan => QueryExecution, + createClone: (SparkSession, SessionState) => SessionState, + val metadataHive: HiveClient) extends SessionState( sparkContext, sharedState, @@ -69,14 +76,11 @@ private[hive] class HiveSessionState( catalog, sqlParser, analyzer, + optimizer, + planner, streamingQueryManager, - queryExecutionCreator) { self => - - /** - * Planner that takes into account Hive-specific strategies. - */ - override def planner: SparkPlanner = plannerCreator() - + createQueryExecution, + createClone) { // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days @@ -121,150 +125,115 @@ private[hive] class HiveSessionState( def hiveThriftServerAsync: Boolean = { conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) } +} +private[hive] object HiveSessionState { /** - * Get an identical copy of the `HiveSessionState`. - * This should ideally reuse the `SessionState.clone` but cannot do so. - * Doing that will throw an exception when trying to clone the catalog. + * Create a new [[HiveSessionState]] for the given session. */ - override def clone(newSparkSession: SparkSession): HiveSessionState = { - val sparkContext = newSparkSession.sparkContext - val confCopy = conf.clone() - val functionRegistryCopy = functionRegistry.clone() - val experimentalMethodsCopy = experimentalMethods.clone() - val sqlParser: ParserInterface = new SparkSqlParser(confCopy) - val catalogCopy = catalog.newSessionCatalogWith( - newSparkSession, - confCopy, - SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy), - functionRegistryCopy, - sqlParser) - val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan) - - val hiveClient = - newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - .newSession() - - SessionState.mergeSparkConf(confCopy, sparkContext.getConf) - - new HiveSessionState( - sparkContext, - newSparkSession.sharedState, - confCopy, - experimentalMethodsCopy, - functionRegistryCopy, - catalogCopy, - sqlParser, - hiveClient, - HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy), - new StreamingQueryManager(newSparkSession), - queryExecutionCreator, - HiveSessionState.createPlannerCreator( - newSparkSession, - confCopy, - experimentalMethodsCopy)) + def apply(session: SparkSession): HiveSessionState = { + new HiveSessionStateBuilder(session).build() } - } -private[hive] object HiveSessionState { - - def apply(sparkSession: SparkSession): HiveSessionState = { - apply(sparkSession, new SQLConf) - } - - def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = { - val initHelper = SessionState(sparkSession, conf) - - val sparkContext = sparkSession.sparkContext - - val catalog = HiveSessionCatalog( - sparkSession, - initHelper.functionRegistry, - initHelper.conf, - SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf), - initHelper.sqlParser) - - val metadataHive: HiveClient = - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - .newSession() - - val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf) +/** + * Builder that produces a [[HiveSessionState]]. + */ +@Experimental +@InterfaceStability.Unstable +class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) + extends BaseSessionStateBuilder(session, parentState) { - val plannerCreator = createPlannerCreator( - sparkSession, - initHelper.conf, - initHelper.experimentalMethods) + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - val hiveSessionState = new HiveSessionState( - sparkContext, - sparkSession.sharedState, - initHelper.conf, - initHelper.experimentalMethods, - initHelper.functionRegistry, - catalog, - initHelper.sqlParser, - metadataHive, - analyzer, - initHelper.streamingQueryManager, - initHelper.queryExecutionCreator, - plannerCreator) - catalog.functionResourceLoader = hiveSessionState.functionResourceLoader - hiveSessionState + /** + * Create a [[HiveSessionCatalog]]. + */ + override protected lazy val catalog: HiveSessionCatalog = { + val catalog = new HiveSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + new HiveMetastoreCatalog(session), + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + new SessionFunctionResourceLoader(session)) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog } /** - * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`. + * A logical query plan `Analyzer` with rules specific to Hive. */ - private def createAnalyzer( - sparkSession: SparkSession, - catalog: HiveSessionCatalog, - sqlConf: SQLConf): Analyzer = { - new Analyzer(catalog, sqlConf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(sparkSession) :: - new FindDataSourceTable(sparkSession) :: - new ResolveSQLOnFile(sparkSession) :: Nil - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - new DetermineTableStats(sparkSession) :: - catalog.ParquetConversions :: - catalog.OrcConversions :: - PreprocessTableCreation(sparkSession) :: - PreprocessTableInsertion(sqlConf) :: - DataSourceAnalysis(sqlConf) :: - HiveAnalysis :: Nil + override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + catalog.ParquetConversions +: + catalog.OrcConversions +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + customCheckRules + } - override val extendedCheckRules = Seq(PreWriteCheck) + /** + * Planner that takes into account Hive-specific strategies. + */ + override protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { + override val sparkSession: SparkSession = session + + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + + override def strategies: Seq[Strategy] = { + experimentalMethods.extraStrategies ++ + extraPlanningStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + Scripts, + Aggregation, + JoinSelection, + BasicOperators + ) + } } } - private def createPlannerCreator( - associatedSparkSession: SparkSession, - sqlConf: SQLConf, - experimentalMethods: ExperimentalMethods): () => SparkPlanner = { - () => - new SparkPlanner( - associatedSparkSession.sparkContext, - sqlConf, - experimentalMethods.extraStrategies) - with HiveStrategies { - - override val sparkSession: SparkSession = associatedSparkSession + override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) - override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) - } - } + override def build(): HiveSessionState = { + val metadataHive: HiveClient = externalCatalog.client.newSession() + new HiveSessionState( + session.sparkContext, + session.sharedState, + conf, + experimentalMethods, + functionRegistry, + catalog, + sqlParser, + analyzer, + optimizer, + planner, + streamingQueryManager, + createQueryExecution, + createClone, + metadataHive) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index b63ed76967..32ca69605e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -148,12 +148,14 @@ class TestHiveContext( * * @param sc SparkContext * @param existingSharedState optional [[SharedState]] + * @param parentSessionState optional parent [[SessionState]] * @param loadTestTables if true, load the test tables. They can only be loaded when running * in the JVM, i.e when calling from Python this flag has to be false. */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, @transient private val existingSharedState: Option[TestHiveSharedState], + @transient private val parentSessionState: Option[HiveSessionState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => @@ -161,6 +163,7 @@ private[hive] class TestHiveSparkSession( this( sc, existingSharedState = None, + parentSessionState = None, loadTestTables) } @@ -168,6 +171,7 @@ private[hive] class TestHiveSparkSession( this( sc, existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))), + parentSessionState = None, loadTestTables) } @@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession( @transient override lazy val sessionState: HiveSessionState = { - val testConf = - new SQLConf { - clear() - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - override def clear(): Unit = { - super.clear() - TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } - } - } - val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan) - val initHelper = HiveSessionState(this, testConf) - SessionState.mergeSparkConf(testConf, sparkContext.getConf) - - new HiveSessionState( - sparkContext, - sharedState, - testConf, - initHelper.experimentalMethods, - initHelper.functionRegistry, - initHelper.catalog, - initHelper.sqlParser, - initHelper.metadataHive, - initHelper.analyzer, - initHelper.streamingQueryManager, - queryExecutionCreator, - initHelper.plannerCreator) + new TestHiveSessionStateBuilder(this, parentSessionState).build() } override def newSession(): TestHiveSparkSession = { - new TestHiveSparkSession(sc, Some(sharedState), loadTestTables) + new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables) + } + + override def cloneSession(): SparkSession = { + val result = new TestHiveSparkSession( + sparkContext, + Some(sharedState), + Some(sessionState), + loadTestTables) + result.sessionState // force copy of SessionState + result } private var cacheTables: Boolean = false @@ -595,3 +584,18 @@ private[hive] object TestHiveContext { } } + +private[sql] class TestHiveSessionStateBuilder( + session: SparkSession, + state: Option[SessionState]) + extends HiveSessionStateBuilder(session, state) + with WithTestConf { + + override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs + + override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan => + new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan) + } + + override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _) +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala deleted file mode 100644 index 3b0f59b159..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.net.URI - -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry -import org.apache.spark.sql.catalyst.catalog.CatalogDatabase -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.Range -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Utils - -class HiveSessionCatalogSuite extends TestHiveSingleton { - - test("clone HiveSessionCatalog") { - val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - - val tempTableName1 = "copytest1" - val tempTableName2 = "copytest2" - try { - val tempTable1 = Range(1, 10, 1, 10) - original.createTempView(tempTableName1, tempTable1, overrideIfExists = false) - - // check if tables copied over - val clone = original.newSessionCatalogWith( - spark, - new SQLConf, - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - assert(original ne clone) - assert(clone.getTempView(tempTableName1) == Some(tempTable1)) - - // check if clone and original independent - clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false) - assert(original.getTempView(tempTableName1) == Some(tempTable1)) - - val tempTable2 = Range(1, 20, 2, 10) - original.createTempView(tempTableName2, tempTable2, overrideIfExists = false) - assert(clone.getTempView(tempTableName2).isEmpty) - } finally { - // Drop the created temp views from the global singleton HiveSession. - original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true) - original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true) - } - } - - test("clone SessionCatalog - current db") { - val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - val originalCurrentDatabase = original.getCurrentDatabase - val db1 = "db1" - val db2 = "db2" - val db3 = "db3" - try { - original.createDatabase(newDb(db1), ignoreIfExists = true) - original.createDatabase(newDb(db2), ignoreIfExists = true) - original.createDatabase(newDb(db3), ignoreIfExists = true) - - original.setCurrentDatabase(db1) - - // check if tables copied over - val clone = original.newSessionCatalogWith( - spark, - new SQLConf, - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - - // check if current db copied over - assert(original ne clone) - assert(clone.getCurrentDatabase == db1) - - // check if clone and original independent - clone.setCurrentDatabase(db2) - assert(original.getCurrentDatabase == db1) - original.setCurrentDatabase(db3) - assert(clone.getCurrentDatabase == db2) - } finally { - // Drop the created databases from the global singleton HiveSession. - original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true) - original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true) - original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true) - original.setCurrentDatabase(originalCurrentDatabase) - } - } - - def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) - - def newDb(name: String): CatalogDatabase = { - CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) - } -} |