aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorKunal Khamar <kkhamar@outlook.com>2017-03-08 13:06:22 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-08 13:20:45 -0800
commit6570cfd7abe349dc6d2151f2ac9dc662e7465a79 (patch)
tree97b54a89a3d228c737203989d6b68db5ec75d8ef /sql/hive
parent1bf9012380de2aa7bdf39220b55748defde8b700 (diff)
downloadspark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.gz
spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.tar.bz2
spark-6570cfd7abe349dc6d2151f2ac9dc662e7465a79.zip
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState. Subsequent changes to base session are not propagated to cloned session, clone is independent after creation. If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables. Unit tests Author: Kunal Khamar <kkhamar@outlook.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #16826 from kunalkhamar/fork-sparksession.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala92
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala261
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala67
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala112
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala41
7 files changed, 460 insertions, 120 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4d3b6c3cec..d135dfa9f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,8 +41,9 @@ import org.apache.spark.sql.types._
* cleaned up to integrate more nicely with [[HiveExternalCatalog]].
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
- private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
- private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
+ // these are def_s and not val/lazy val since the latter would introduce circular references
+ private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
+ private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index f1ea86890c..6b7599e3d3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
@@ -43,31 +43,23 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
- sparkSession: SparkSession,
- functionResourceLoader: FunctionResourceLoader,
+ private val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface)
extends SessionCatalog(
- externalCatalog,
- globalTempViewManager,
- functionResourceLoader,
- functionRegistry,
- conf,
- hadoopConf,
- parser) {
+ externalCatalog,
+ globalTempViewManager,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser) {
// ----------------------------------------------------------------
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------
- // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
- // essentially a cache for metastore tables. However, it relies on a lot of session-specific
- // things so it would be a lot of work to split its functionality between HiveSessionCatalog
- // and HiveCatalog. We should still do it at some point...
- private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
-
// These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
// `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
@@ -77,10 +69,51 @@ private[sql] class HiveSessionCatalog(
metastoreCatalog.hiveDefaultTableFilePath(name)
}
+ /**
+ * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and
+ * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+ */
+ def newSessionCatalogWith(
+ newSparkSession: SparkSession,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ functionRegistry: FunctionRegistry,
+ parser: ParserInterface): HiveSessionCatalog = {
+ val catalog = HiveSessionCatalog(
+ newSparkSession,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser)
+
+ synchronized {
+ catalog.currentDb = currentDb
+ // copy over temporary tables
+ tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+ }
+
+ catalog
+ }
+
+ /**
+ * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add
+ * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However,
+ * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of
+ * `newSessionCatalogWith` and disable this one.
+ *
+ * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly.
+ */
+ override def newSessionCatalogWith(
+ conf: CatalystConf,
+ hadoopConf: Configuration,
+ functionRegistry: FunctionRegistry,
+ parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException(
+ "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession")
+
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = metastoreCatalog.getQualifiedTableName(table)
- sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key)
+ tableRelationCache.getIfPresent(key)
}
override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
@@ -217,3 +250,28 @@ private[sql] class HiveSessionCatalog(
"histogram_numeric"
)
}
+
+private[sql] object HiveSessionCatalog {
+
+ def apply(
+ sparkSession: SparkSession,
+ functionRegistry: FunctionRegistry,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ parser: ParserInterface): HiveSessionCatalog = {
+ // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
+ // essentially a cache for metastore tables. However, it relies on a lot of session-specific
+ // things so it would be a lot of work to split its functionality between HiveSessionCatalog
+ // and HiveCatalog. We should still do it at some point...
+ val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+
+ new HiveSessionCatalog(
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
+ metastoreCatalog,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 5a08a6bc66..cb8bcb8591 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,89 +17,65 @@
package org.apache.spark.sql.hive
+import org.apache.spark.SparkContext
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.streaming.StreamingQueryManager
/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
+ * @param sparkContext The [[SparkContext]].
+ * @param sharedState The shared state.
+ * @param conf SQL-specific key-value configurations.
+ * @param experimentalMethods The experimental methods.
+ * @param functionRegistry Internal catalog for managing functions registered by the user.
+ * @param catalog Internal catalog for managing table and database states that uses Hive client for
+ * interacting with the metastore.
+ * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+ * @param metadataHive The Hive metadata client.
+ * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
+ * @param streamingQueryManager Interface to start and stop
+ * [[org.apache.spark.sql.streaming.StreamingQuery]]s.
+ * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
+ * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies
*/
-private[hive] class HiveSessionState(sparkSession: SparkSession)
- extends SessionState(sparkSession) {
-
- self =>
-
- /**
- * A Hive client used for interacting with the metastore.
- */
- lazy val metadataHive: HiveClient =
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
-
- /**
- * Internal catalog for managing table and database states.
- */
- override lazy val catalog = {
- new HiveSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
+private[hive] class HiveSessionState(
+ sparkContext: SparkContext,
+ sharedState: SharedState,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods,
+ functionRegistry: FunctionRegistry,
+ override val catalog: HiveSessionCatalog,
+ sqlParser: ParserInterface,
+ val metadataHive: HiveClient,
+ analyzer: Analyzer,
+ streamingQueryManager: StreamingQueryManager,
+ queryExecutionCreator: LogicalPlan => QueryExecution,
+ val plannerCreator: () => SparkPlanner)
+ extends SessionState(
+ sparkContext,
+ sharedState,
conf,
- newHadoopConf(),
- sqlParser)
- }
-
- /**
- * An analyzer that uses the Hive metastore.
- */
- override lazy val analyzer: Analyzer = {
- new Analyzer(catalog, conf) {
- override val extendedResolutionRules =
- new ResolveHiveSerdeTable(sparkSession) ::
- new FindDataSourceTable(sparkSession) ::
- new ResolveSQLOnFile(sparkSession) :: Nil
-
- override val postHocResolutionRules =
- new DetermineTableStats(sparkSession) ::
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- PreprocessTableCreation(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
- HiveAnalysis :: Nil
-
- override val extendedCheckRules = Seq(PreWriteCheck)
- }
- }
+ experimentalMethods,
+ functionRegistry,
+ catalog,
+ sqlParser,
+ analyzer,
+ streamingQueryManager,
+ queryExecutionCreator) { self =>
/**
* Planner that takes into account Hive-specific strategies.
*/
- override def planner: SparkPlanner = {
- new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
- with HiveStrategies {
- override val sparkSession: SparkSession = self.sparkSession
-
- override def strategies: Seq[Strategy] = {
- experimentalMethods.extraStrategies ++ Seq(
- FileSourceStrategy,
- DataSourceStrategy,
- SpecialLimits,
- InMemoryScans,
- HiveTableScans,
- Scripts,
- Aggregation,
- JoinSelection,
- BasicOperators
- )
- }
- }
- }
+ override def planner: SparkPlanner = plannerCreator()
// ------------------------------------------------------
@@ -146,4 +122,149 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
+ /**
+ * Get an identical copy of the `HiveSessionState`.
+ * This should ideally reuse the `SessionState.clone` but cannot do so.
+ * Doing that will throw an exception when trying to clone the catalog.
+ */
+ override def clone(newSparkSession: SparkSession): HiveSessionState = {
+ val sparkContext = newSparkSession.sparkContext
+ val confCopy = conf.clone()
+ val functionRegistryCopy = functionRegistry.clone()
+ val experimentalMethodsCopy = experimentalMethods.clone()
+ val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+ val catalogCopy = catalog.newSessionCatalogWith(
+ newSparkSession,
+ confCopy,
+ SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
+ functionRegistryCopy,
+ sqlParser)
+ val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
+
+ val hiveClient =
+ newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ .newSession()
+
+ SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+ new HiveSessionState(
+ sparkContext,
+ newSparkSession.sharedState,
+ confCopy,
+ experimentalMethodsCopy,
+ functionRegistryCopy,
+ catalogCopy,
+ sqlParser,
+ hiveClient,
+ HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+ new StreamingQueryManager(newSparkSession),
+ queryExecutionCreator,
+ HiveSessionState.createPlannerCreator(
+ newSparkSession,
+ confCopy,
+ experimentalMethodsCopy))
+ }
+
+}
+
+private[hive] object HiveSessionState {
+
+ def apply(sparkSession: SparkSession): HiveSessionState = {
+ apply(sparkSession, new SQLConf)
+ }
+
+ def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = {
+ val initHelper = SessionState(sparkSession, conf)
+
+ val sparkContext = sparkSession.sparkContext
+
+ val catalog = HiveSessionCatalog(
+ sparkSession,
+ initHelper.functionRegistry,
+ initHelper.conf,
+ SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf),
+ initHelper.sqlParser)
+
+ val metadataHive: HiveClient =
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ .newSession()
+
+ val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf)
+
+ val plannerCreator = createPlannerCreator(
+ sparkSession,
+ initHelper.conf,
+ initHelper.experimentalMethods)
+
+ val hiveSessionState = new HiveSessionState(
+ sparkContext,
+ sparkSession.sharedState,
+ initHelper.conf,
+ initHelper.experimentalMethods,
+ initHelper.functionRegistry,
+ catalog,
+ initHelper.sqlParser,
+ metadataHive,
+ analyzer,
+ initHelper.streamingQueryManager,
+ initHelper.queryExecutionCreator,
+ plannerCreator)
+ catalog.functionResourceLoader = hiveSessionState.functionResourceLoader
+ hiveSessionState
+ }
+
+ /**
+ * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`.
+ */
+ private def createAnalyzer(
+ sparkSession: SparkSession,
+ catalog: HiveSessionCatalog,
+ sqlConf: SQLConf): Analyzer = {
+ new Analyzer(catalog, sqlConf) {
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new ResolveHiveSerdeTable(sparkSession) ::
+ new FindDataSourceTable(sparkSession) ::
+ new ResolveSQLOnFile(sparkSession) :: Nil
+
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ new DetermineTableStats(sparkSession) ::
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ PreprocessTableCreation(sparkSession) ::
+ PreprocessTableInsertion(sqlConf) ::
+ DataSourceAnalysis(sqlConf) ::
+ HiveAnalysis :: Nil
+
+ override val extendedCheckRules = Seq(PreWriteCheck)
+ }
+ }
+
+ private def createPlannerCreator(
+ associatedSparkSession: SparkSession,
+ sqlConf: SQLConf,
+ experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+ () =>
+ new SparkPlanner(
+ associatedSparkSession.sparkContext,
+ sqlConf,
+ experimentalMethods.extraStrategies)
+ with HiveStrategies {
+
+ override val sparkSession: SparkSession = associatedSparkSession
+
+ override def strategies: Seq[Strategy] = {
+ experimentalMethods.extraStrategies ++ Seq(
+ FileSourceStrategy,
+ DataSourceStrategy,
+ SpecialLimits,
+ InMemoryScans,
+ HiveTableScans,
+ Scripts,
+ Aggregation,
+ JoinSelection,
+ BasicOperators
+ )
+ }
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 469c9d84de..6e1f429286 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -278,6 +278,8 @@ private[hive] class HiveClientImpl(
state.getConf.setClassLoader(clientLoader.classLoader)
// Set the thread local metastore client to the client associated with this HiveClientImpl.
Hive.set(client)
+ // Replace conf in the thread local Hive with current conf
+ Hive.get(conf)
// setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index efc2f00984..076c40d459 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -30,16 +30,17 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.{ExperimentalMethods, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.internal.{SharedState, SQLConf}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
@@ -84,7 +85,7 @@ class TestHiveContext(
new TestHiveContext(sparkSession.newSession())
}
- override def sessionState: TestHiveSessionState = sparkSession.sessionState
+ override def sessionState: HiveSessionState = sparkSession.sessionState
def setCacheTables(c: Boolean): Unit = {
sparkSession.setCacheTables(c)
@@ -144,11 +145,35 @@ private[hive] class TestHiveSparkSession(
existingSharedState.getOrElse(new SharedState(sc))
}
- // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection
- // logic based on the setting of CATALOG_IMPLEMENTATION.
@transient
- override lazy val sessionState: TestHiveSessionState =
- new TestHiveSessionState(self)
+ override lazy val sessionState: HiveSessionState = {
+ val testConf =
+ new SQLConf {
+ clear()
+ override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+ override def clear(): Unit = {
+ super.clear()
+ TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
+ }
+ }
+ val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan)
+ val initHelper = HiveSessionState(this, testConf)
+ SessionState.mergeSparkConf(testConf, sparkContext.getConf)
+
+ new HiveSessionState(
+ sparkContext,
+ sharedState,
+ testConf,
+ initHelper.experimentalMethods,
+ initHelper.functionRegistry,
+ initHelper.catalog,
+ initHelper.sqlParser,
+ initHelper.metadataHive,
+ initHelper.analyzer,
+ initHelper.streamingQueryManager,
+ queryExecutionCreator,
+ initHelper.plannerCreator)
+ }
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
@@ -492,26 +517,6 @@ private[hive] class TestHiveQueryExecution(
}
}
-private[hive] class TestHiveSessionState(
- sparkSession: TestHiveSparkSession)
- extends HiveSessionState(sparkSession) { self =>
-
- override lazy val conf: SQLConf = {
- new SQLConf {
- clear()
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
- override def clear(): Unit = {
- super.clear()
- TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
- }
- }
- }
-
- override def executePlan(plan: LogicalPlan): TestHiveQueryExecution = {
- new TestHiveQueryExecution(sparkSession, plan)
- }
-}
-
private[hive] object TestHiveContext {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
new file mode 100644
index 0000000000..3b0f59b159
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+class HiveSessionCatalogSuite extends TestHiveSingleton {
+
+ test("clone HiveSessionCatalog") {
+ val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
+
+ val tempTableName1 = "copytest1"
+ val tempTableName2 = "copytest2"
+ try {
+ val tempTable1 = Range(1, 10, 1, 10)
+ original.createTempView(tempTableName1, tempTable1, overrideIfExists = false)
+
+ // check if tables copied over
+ val clone = original.newSessionCatalogWith(
+ spark,
+ new SQLConf,
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+ assert(original ne clone)
+ assert(clone.getTempView(tempTableName1) == Some(tempTable1))
+
+ // check if clone and original independent
+ clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false)
+ assert(original.getTempView(tempTableName1) == Some(tempTable1))
+
+ val tempTable2 = Range(1, 20, 2, 10)
+ original.createTempView(tempTableName2, tempTable2, overrideIfExists = false)
+ assert(clone.getTempView(tempTableName2).isEmpty)
+ } finally {
+ // Drop the created temp views from the global singleton HiveSession.
+ original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true)
+ original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true)
+ }
+ }
+
+ test("clone SessionCatalog - current db") {
+ val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
+ val originalCurrentDatabase = original.getCurrentDatabase
+ val db1 = "db1"
+ val db2 = "db2"
+ val db3 = "db3"
+ try {
+ original.createDatabase(newDb(db1), ignoreIfExists = true)
+ original.createDatabase(newDb(db2), ignoreIfExists = true)
+ original.createDatabase(newDb(db3), ignoreIfExists = true)
+
+ original.setCurrentDatabase(db1)
+
+ // check if tables copied over
+ val clone = original.newSessionCatalogWith(
+ spark,
+ new SQLConf,
+ new Configuration(),
+ new SimpleFunctionRegistry,
+ CatalystSqlParser)
+
+ // check if current db copied over
+ assert(original ne clone)
+ assert(clone.getCurrentDatabase == db1)
+
+ // check if clone and original independent
+ clone.setCurrentDatabase(db2)
+ assert(original.getCurrentDatabase == db1)
+ original.setCurrentDatabase(db3)
+ assert(clone.getCurrentDatabase == db2)
+ } finally {
+ // Drop the created databases from the global singleton HiveSession.
+ original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true)
+ original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true)
+ original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true)
+ original.setCurrentDatabase(originalCurrentDatabase)
+ }
+ }
+
+ def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
+
+ def newDb(name: String): CatalogDatabase = {
+ CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
new file mode 100644
index 0000000000..67c77fb62f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+/**
+ * Run all tests from `SessionStateSuite` with a `HiveSessionState`.
+ */
+class HiveSessionStateSuite extends SessionStateSuite
+ with TestHiveSingleton with BeforeAndAfterEach {
+
+ override def beforeAll(): Unit = {
+ // Reuse the singleton session
+ activeSession = spark
+ }
+
+ override def afterAll(): Unit = {
+ // Set activeSession to null to avoid stopping the singleton session
+ activeSession = null
+ super.afterAll()
+ }
+}