aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-03-28 10:07:24 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-28 10:07:24 +0800
commitea361165e1ddce4d8aa0242ae3e878d7b39f1de2 (patch)
treef3014ba709d54b48172a399708074480a6ed9661 /sql/hive
parent8a6f33f0483dcee81467e6374a796b5dbd53ea30 (diff)
downloadspark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.gz
spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.tar.bz2
spark-ea361165e1ddce4d8aa0242ae3e878d7b39f1de2.zip
[SPARK-20100][SQL] Refactor SessionState initialization
## What changes were proposed in this pull request? The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions. This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements: 1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive. 2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this. ## How was this patch tested? Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17433 from hvanhovell/SPARK-20100.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala76
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala259
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala60
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala112
4 files changed, 152 insertions, 355 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 6b7599e3d3..2cc20a791d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
-import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
@@ -47,14 +47,16 @@ private[sql] class HiveSessionCatalog(
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
- parser: ParserInterface)
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
- parser) {
+ parser,
+ functionResourceLoader) {
// ----------------------------------------------------------------
// | Methods and fields for interacting with HiveMetastoreCatalog |
@@ -69,47 +71,6 @@ private[sql] class HiveSessionCatalog(
metastoreCatalog.hiveDefaultTableFilePath(name)
}
- /**
- * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and
- * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
- */
- def newSessionCatalogWith(
- newSparkSession: SparkSession,
- conf: SQLConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): HiveSessionCatalog = {
- val catalog = HiveSessionCatalog(
- newSparkSession,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
-
- synchronized {
- catalog.currentDb = currentDb
- // copy over temporary tables
- tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
- }
-
- catalog
- }
-
- /**
- * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add
- * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However,
- * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of
- * `newSessionCatalogWith` and disable this one.
- *
- * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly.
- */
- override def newSessionCatalogWith(
- conf: CatalystConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException(
- "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession")
-
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = metastoreCatalog.getQualifiedTableName(table)
@@ -250,28 +211,3 @@ private[sql] class HiveSessionCatalog(
"histogram_numeric"
)
}
-
-private[sql] object HiveSessionCatalog {
-
- def apply(
- sparkSession: SparkSession,
- functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hadoopConf: Configuration,
- parser: ParserInterface): HiveSessionCatalog = {
- // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
- // essentially a cache for metastore tables. However, it relies on a lot of session-specific
- // things so it would be a lot of work to split its functionality between HiveSessionCatalog
- // and HiveCatalog. We should still do it at some point...
- val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
-
- new HiveSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- metastoreCatalog,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index cb8bcb8591..49ff8478f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,20 +18,23 @@
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
import org.apache.spark.sql.streaming.StreamingQueryManager
/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
+ *
* @param sparkContext The [[SparkContext]].
* @param sharedState The shared state.
* @param conf SQL-specific key-value configurations.
@@ -40,12 +43,14 @@ import org.apache.spark.sql.streaming.StreamingQueryManager
* @param catalog Internal catalog for managing table and database states that uses Hive client for
* interacting with the metastore.
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
- * @param metadataHive The Hive metadata client.
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- * [[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
- * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to physical plans and that takes
+ * Hive-specific strategies into account.
+ * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param createQueryExecution Function used to create QueryExecution objects.
+ * @param createClone Function used to create clones of the session state.
+ * @param metadataHive The Hive metadata client.
*/
private[hive] class HiveSessionState(
sparkContext: SparkContext,
@@ -55,11 +60,13 @@ private[hive] class HiveSessionState(
functionRegistry: FunctionRegistry,
override val catalog: HiveSessionCatalog,
sqlParser: ParserInterface,
- val metadataHive: HiveClient,
analyzer: Analyzer,
+ optimizer: Optimizer,
+ planner: SparkPlanner,
streamingQueryManager: StreamingQueryManager,
- queryExecutionCreator: LogicalPlan => QueryExecution,
- val plannerCreator: () => SparkPlanner)
+ createQueryExecution: LogicalPlan => QueryExecution,
+ createClone: (SparkSession, SessionState) => SessionState,
+ val metadataHive: HiveClient)
extends SessionState(
sparkContext,
sharedState,
@@ -69,14 +76,11 @@ private[hive] class HiveSessionState(
catalog,
sqlParser,
analyzer,
+ optimizer,
+ planner,
streamingQueryManager,
- queryExecutionCreator) { self =>
-
- /**
- * Planner that takes into account Hive-specific strategies.
- */
- override def planner: SparkPlanner = plannerCreator()
-
+ createQueryExecution,
+ createClone) {
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
@@ -121,150 +125,115 @@ private[hive] class HiveSessionState(
def hiveThriftServerAsync: Boolean = {
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
+}
+private[hive] object HiveSessionState {
/**
- * Get an identical copy of the `HiveSessionState`.
- * This should ideally reuse the `SessionState.clone` but cannot do so.
- * Doing that will throw an exception when trying to clone the catalog.
+ * Create a new [[HiveSessionState]] for the given session.
*/
- override def clone(newSparkSession: SparkSession): HiveSessionState = {
- val sparkContext = newSparkSession.sparkContext
- val confCopy = conf.clone()
- val functionRegistryCopy = functionRegistry.clone()
- val experimentalMethodsCopy = experimentalMethods.clone()
- val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
- val catalogCopy = catalog.newSessionCatalogWith(
- newSparkSession,
- confCopy,
- SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
- functionRegistryCopy,
- sqlParser)
- val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
-
- val hiveClient =
- newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .newSession()
-
- SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
- new HiveSessionState(
- sparkContext,
- newSparkSession.sharedState,
- confCopy,
- experimentalMethodsCopy,
- functionRegistryCopy,
- catalogCopy,
- sqlParser,
- hiveClient,
- HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
- new StreamingQueryManager(newSparkSession),
- queryExecutionCreator,
- HiveSessionState.createPlannerCreator(
- newSparkSession,
- confCopy,
- experimentalMethodsCopy))
+ def apply(session: SparkSession): HiveSessionState = {
+ new HiveSessionStateBuilder(session).build()
}
-
}
-private[hive] object HiveSessionState {
-
- def apply(sparkSession: SparkSession): HiveSessionState = {
- apply(sparkSession, new SQLConf)
- }
-
- def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = {
- val initHelper = SessionState(sparkSession, conf)
-
- val sparkContext = sparkSession.sparkContext
-
- val catalog = HiveSessionCatalog(
- sparkSession,
- initHelper.functionRegistry,
- initHelper.conf,
- SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf),
- initHelper.sqlParser)
-
- val metadataHive: HiveClient =
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .newSession()
-
- val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf)
+/**
+ * Builder that produces a [[HiveSessionState]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None)
+ extends BaseSessionStateBuilder(session, parentState) {
- val plannerCreator = createPlannerCreator(
- sparkSession,
- initHelper.conf,
- initHelper.experimentalMethods)
+ private def externalCatalog: HiveExternalCatalog =
+ session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
- val hiveSessionState = new HiveSessionState(
- sparkContext,
- sparkSession.sharedState,
- initHelper.conf,
- initHelper.experimentalMethods,
- initHelper.functionRegistry,
- catalog,
- initHelper.sqlParser,
- metadataHive,
- analyzer,
- initHelper.streamingQueryManager,
- initHelper.queryExecutionCreator,
- plannerCreator)
- catalog.functionResourceLoader = hiveSessionState.functionResourceLoader
- hiveSessionState
+ /**
+ * Create a [[HiveSessionCatalog]].
+ */
+ override protected lazy val catalog: HiveSessionCatalog = {
+ val catalog = new HiveSessionCatalog(
+ externalCatalog,
+ session.sharedState.globalTempViewManager,
+ new HiveMetastoreCatalog(session),
+ functionRegistry,
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+ sqlParser,
+ new SessionFunctionResourceLoader(session))
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
}
/**
- * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`.
+ * A logical query plan `Analyzer` with rules specific to Hive.
*/
- private def createAnalyzer(
- sparkSession: SparkSession,
- catalog: HiveSessionCatalog,
- sqlConf: SQLConf): Analyzer = {
- new Analyzer(catalog, sqlConf) {
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new ResolveHiveSerdeTable(sparkSession) ::
- new FindDataSourceTable(sparkSession) ::
- new ResolveSQLOnFile(sparkSession) :: Nil
-
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- new DetermineTableStats(sparkSession) ::
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- PreprocessTableCreation(sparkSession) ::
- PreprocessTableInsertion(sqlConf) ::
- DataSourceAnalysis(sqlConf) ::
- HiveAnalysis :: Nil
+ override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new ResolveHiveSerdeTable(session) +:
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ customResolutionRules
+
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ new DetermineTableStats(session) +:
+ catalog.ParquetConversions +:
+ catalog.OrcConversions +:
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ HiveAnalysis +:
+ customPostHocResolutionRules
+
+ override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+ PreWriteCheck +:
+ customCheckRules
+ }
- override val extendedCheckRules = Seq(PreWriteCheck)
+ /**
+ * Planner that takes into account Hive-specific strategies.
+ */
+ override protected def planner: SparkPlanner = {
+ new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies {
+ override val sparkSession: SparkSession = session
+
+ override def extraPlanningStrategies: Seq[Strategy] =
+ super.extraPlanningStrategies ++ customPlanningStrategies
+
+ override def strategies: Seq[Strategy] = {
+ experimentalMethods.extraStrategies ++
+ extraPlanningStrategies ++ Seq(
+ FileSourceStrategy,
+ DataSourceStrategy,
+ SpecialLimits,
+ InMemoryScans,
+ HiveTableScans,
+ Scripts,
+ Aggregation,
+ JoinSelection,
+ BasicOperators
+ )
+ }
}
}
- private def createPlannerCreator(
- associatedSparkSession: SparkSession,
- sqlConf: SQLConf,
- experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
- () =>
- new SparkPlanner(
- associatedSparkSession.sparkContext,
- sqlConf,
- experimentalMethods.extraStrategies)
- with HiveStrategies {
-
- override val sparkSession: SparkSession = associatedSparkSession
+ override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
- override def strategies: Seq[Strategy] = {
- experimentalMethods.extraStrategies ++ Seq(
- FileSourceStrategy,
- DataSourceStrategy,
- SpecialLimits,
- InMemoryScans,
- HiveTableScans,
- Scripts,
- Aggregation,
- JoinSelection,
- BasicOperators
- )
- }
- }
+ override def build(): HiveSessionState = {
+ val metadataHive: HiveClient = externalCatalog.client.newSession()
+ new HiveSessionState(
+ session.sparkContext,
+ session.sharedState,
+ conf,
+ experimentalMethods,
+ functionRegistry,
+ catalog,
+ sqlParser,
+ analyzer,
+ optimizer,
+ planner,
+ streamingQueryManager,
+ createQueryExecution,
+ createClone,
+ metadataHive)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b63ed76967..32ca69605e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -148,12 +148,14 @@ class TestHiveContext(
*
* @param sc SparkContext
* @param existingSharedState optional [[SharedState]]
+ * @param parentSessionState optional parent [[SessionState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[TestHiveSharedState],
+ @transient private val parentSessionState: Option[HiveSessionState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@@ -161,6 +163,7 @@ private[hive] class TestHiveSparkSession(
this(
sc,
existingSharedState = None,
+ parentSessionState = None,
loadTestTables)
}
@@ -168,6 +171,7 @@ private[hive] class TestHiveSparkSession(
this(
sc,
existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))),
+ parentSessionState = None,
loadTestTables)
}
@@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession(
@transient
override lazy val sessionState: HiveSessionState = {
- val testConf =
- new SQLConf {
- clear()
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
- override def clear(): Unit = {
- super.clear()
- TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
- }
- }
- val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan)
- val initHelper = HiveSessionState(this, testConf)
- SessionState.mergeSparkConf(testConf, sparkContext.getConf)
-
- new HiveSessionState(
- sparkContext,
- sharedState,
- testConf,
- initHelper.experimentalMethods,
- initHelper.functionRegistry,
- initHelper.catalog,
- initHelper.sqlParser,
- initHelper.metadataHive,
- initHelper.analyzer,
- initHelper.streamingQueryManager,
- queryExecutionCreator,
- initHelper.plannerCreator)
+ new TestHiveSessionStateBuilder(this, parentSessionState).build()
}
override def newSession(): TestHiveSparkSession = {
- new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
+ new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
+ }
+
+ override def cloneSession(): SparkSession = {
+ val result = new TestHiveSparkSession(
+ sparkContext,
+ Some(sharedState),
+ Some(sessionState),
+ loadTestTables)
+ result.sessionState // force copy of SessionState
+ result
}
private var cacheTables: Boolean = false
@@ -595,3 +584,18 @@ private[hive] object TestHiveContext {
}
}
+
+private[sql] class TestHiveSessionStateBuilder(
+ session: SparkSession,
+ state: Option[SessionState])
+ extends HiveSessionStateBuilder(session, state)
+ with WithTestConf {
+
+ override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs
+
+ override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan =>
+ new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan)
+ }
+
+ override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _)
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
deleted file mode 100644
index 3b0f59b159..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.Range
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
-
-class HiveSessionCatalogSuite extends TestHiveSingleton {
-
- test("clone HiveSessionCatalog") {
- val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-
- val tempTableName1 = "copytest1"
- val tempTableName2 = "copytest2"
- try {
- val tempTable1 = Range(1, 10, 1, 10)
- original.createTempView(tempTableName1, tempTable1, overrideIfExists = false)
-
- // check if tables copied over
- val clone = original.newSessionCatalogWith(
- spark,
- new SQLConf,
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
- assert(original ne clone)
- assert(clone.getTempView(tempTableName1) == Some(tempTable1))
-
- // check if clone and original independent
- clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false)
- assert(original.getTempView(tempTableName1) == Some(tempTable1))
-
- val tempTable2 = Range(1, 20, 2, 10)
- original.createTempView(tempTableName2, tempTable2, overrideIfExists = false)
- assert(clone.getTempView(tempTableName2).isEmpty)
- } finally {
- // Drop the created temp views from the global singleton HiveSession.
- original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true)
- original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true)
- }
- }
-
- test("clone SessionCatalog - current db") {
- val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
- val originalCurrentDatabase = original.getCurrentDatabase
- val db1 = "db1"
- val db2 = "db2"
- val db3 = "db3"
- try {
- original.createDatabase(newDb(db1), ignoreIfExists = true)
- original.createDatabase(newDb(db2), ignoreIfExists = true)
- original.createDatabase(newDb(db3), ignoreIfExists = true)
-
- original.setCurrentDatabase(db1)
-
- // check if tables copied over
- val clone = original.newSessionCatalogWith(
- spark,
- new SQLConf,
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
-
- // check if current db copied over
- assert(original ne clone)
- assert(clone.getCurrentDatabase == db1)
-
- // check if clone and original independent
- clone.setCurrentDatabase(db2)
- assert(original.getCurrentDatabase == db1)
- original.setCurrentDatabase(db3)
- assert(clone.getCurrentDatabase == db2)
- } finally {
- // Drop the created databases from the global singleton HiveSession.
- original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true)
- original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true)
- original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true)
- original.setCurrentDatabase(originalCurrentDatabase)
- }
- }
-
- def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
-
- def newDb(name: String): CatalogDatabase = {
- CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
- }
-}