aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala46
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala180
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala279
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala76
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala259
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala60
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala112
13 files changed, 547 insertions, 572 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a469d12451..72ab075408 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -54,7 +54,8 @@ class SessionCatalog(
functionRegistry: FunctionRegistry,
conf: CatalystConf,
hadoopConf: Configuration,
- parser: ParserInterface) extends Logging {
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader) extends Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec
@@ -69,8 +70,8 @@ class SessionCatalog(
functionRegistry,
conf,
new Configuration(),
- CatalystSqlParser)
- functionResourceLoader = DummyFunctionResourceLoader
+ CatalystSqlParser,
+ DummyFunctionResourceLoader)
}
// For testing only.
@@ -90,9 +91,7 @@ class SessionCatalog(
// check whether the temporary table or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
- protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
-
- @volatile var functionResourceLoader: FunctionResourceLoader = _
+ protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
@@ -1059,9 +1058,6 @@ class SessionCatalog(
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
- if (functionResourceLoader == null) {
- throw new IllegalStateException("functionResourceLoader has not yet been initialized")
- }
resources.foreach(functionResourceLoader.loadResource)
}
@@ -1259,28 +1255,16 @@ class SessionCatalog(
}
/**
- * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
- * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+ * Copy the current state of the catalog to another catalog.
+ *
+ * This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied
+ * state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be
+ * because the target [[SessionCatalog]] should not be published at this point. The caller must
+ * synchronize on the target if this assumption does not hold.
*/
- def newSessionCatalogWith(
- conf: CatalystConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): SessionCatalog = {
- val catalog = new SessionCatalog(
- externalCatalog,
- globalTempViewManager,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
-
- synchronized {
- catalog.currentDb = currentDb
- // copy over temporary tables
- tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
- }
-
- catalog
+ private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized {
+ target.currentDb = currentDb
+ // copy over temporary tables
+ tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dbe3ded4bb..dbf479d215 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -17,20 +17,14 @@
package org.apache.spark.sql.catalyst.optimizer
-import scala.annotation.tailrec
-import scala.collection.immutable.HashSet
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
- Batch("Operator Optimizations", fixedPoint,
+ Batch("Operator Optimizations", fixedPoint, Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin(conf),
@@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
- SimplifyCreateMapOps) ::
+ SimplifyCreateMapOps) ++
+ extendedOperatorOptimizationRules: _*) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Join Reorder", Once,
@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
s.withNewPlan(newPlan)
}
}
+
+ /**
+ * Override to provide additional rules for the operator optimization batch.
+ */
+ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ca4ce1c117..56bca73a88 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.hadoop.conf.Configuration
-
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
@@ -1331,17 +1329,15 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
- test("clone SessionCatalog - temp views") {
+ test("copy SessionCatalog state - temp views") {
withEmptyCatalog { original =>
val tempTable1 = Range(1, 10, 1, 10)
original.createTempView("copytest1", tempTable1, overrideIfExists = false)
// check if tables copied over
- val clone = original.newSessionCatalogWith(
- SimpleCatalystConf(caseSensitiveAnalysis = true),
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
+ val clone = new SessionCatalog(original.externalCatalog)
+ original.copyStateTo(clone)
+
assert(original ne clone)
assert(clone.getTempView("copytest1") == Some(tempTable1))
@@ -1355,7 +1351,7 @@ abstract class SessionCatalogSuite extends PlanTest {
}
}
- test("clone SessionCatalog - current db") {
+ test("copy SessionCatalog state - current db") {
withEmptyCatalog { original =>
val db1 = "db1"
val db2 = "db2"
@@ -1368,11 +1364,9 @@ abstract class SessionCatalogSuite extends PlanTest {
original.setCurrentDatabase(db1)
// check if current db copied over
- val clone = original.newSessionCatalogWith(
- SimpleCatalystConf(caseSensitiveAnalysis = true),
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
+ val clone = new SessionCatalog(original.externalCatalog)
+ original.copyStateTo(clone)
+
assert(original ne clone)
assert(clone.getCurrentDatabase == db1)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 981728331d..2cdfb7a782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -30,9 +30,17 @@ class SparkOptimizer(
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog, conf) {
- override def batches: Seq[Batch] = super.batches :+
+ override def batches: Seq[Batch] = (super.batches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
- Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
+ postHocOptimizationBatches :+
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
+
+ /**
+ * Optimization batches that are executed after the regular optimization batches, but before the
+ * batch executing the [[ExperimentalMethods]] optimizer rules. This hook can be used to add
+ * custom optimizer batches to the Spark optimizer.
+ */
+ def postHocOptimizationBatches: Seq[Batch] = Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 678241656c..6566502bd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -27,13 +27,14 @@ import org.apache.spark.sql.internal.SQLConf
class SparkPlanner(
val sparkContext: SparkContext,
val conf: SQLConf,
- val extraStrategies: Seq[Strategy])
+ val experimentalMethods: ExperimentalMethods)
extends SparkStrategies {
def numPartitions: Int = conf.numShufflePartitions
def strategies: Seq[Strategy] =
- extraStrategies ++ (
+ experimentalMethods.extraStrategies ++
+ extraPlanningStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
SpecialLimits ::
@@ -42,6 +43,12 @@ class SparkPlanner(
InMemoryScans ::
BasicOperators :: Nil)
+ /**
+ * Override to add extra planning strategies to the planner. These strategies are tried after
+ * the strategies defined in [[ExperimentalMethods]], and before the regular strategies.
+ */
+ def extraPlanningStrategies: Seq[Strategy] = Nil
+
override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
plan.collect {
case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 0f0e4a91f8..622e049630 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
@@ -40,20 +40,17 @@ class IncrementalExecution(
offsetSeqMetadata: OffsetSeqMetadata)
extends QueryExecution(sparkSession, logicalPlan) with Logging {
- // TODO: make this always part of planning.
- val streamingExtraStrategies =
- sparkSession.sessionState.planner.StatefulAggregationStrategy +:
- sparkSession.sessionState.planner.FlatMapGroupsWithStateStrategy +:
- sparkSession.sessionState.planner.StreamingRelationStrategy +:
- sparkSession.sessionState.planner.StreamingDeduplicationStrategy +:
- sparkSession.sessionState.experimentalMethods.extraStrategies
-
// Modified planner with stateful operations.
- override def planner: SparkPlanner =
- new SparkPlanner(
+ override val planner: SparkPlanner = new SparkPlanner(
sparkSession.sparkContext,
sparkSession.sessionState.conf,
- streamingExtraStrategies)
+ sparkSession.sessionState.experimentalMethods) {
+ override def extraPlanningStrategies: Seq[Strategy] =
+ StatefulAggregationStrategy ::
+ FlatMapGroupsWithStateStrategy ::
+ StreamingRelationStrategy ::
+ StreamingDeduplicationStrategy :: Nil
+ }
/**
* See [SPARK-18339]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ce80604bd3..b5b0bb0bfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,22 +22,21 @@ import java.io.File
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager
-
/**
* A class that holds all session-specific state in a given [[SparkSession]].
+ *
* @param sparkContext The [[SparkContext]].
* @param sharedState The shared state.
* @param conf SQL-specific key-value configurations.
@@ -46,9 +45,11 @@ import org.apache.spark.sql.util.ExecutionListenerManager
* @param catalog Internal catalog for managing table and database states.
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- * [[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to physical plans
+ * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param createQueryExecution Function used to create QueryExecution objects.
+ * @param createClone Function used to create clones of the session state.
*/
private[sql] class SessionState(
sparkContext: SparkContext,
@@ -59,8 +60,11 @@ private[sql] class SessionState(
val catalog: SessionCatalog,
val sqlParser: ParserInterface,
val analyzer: Analyzer,
+ val optimizer: Optimizer,
+ val planner: SparkPlanner,
val streamingQueryManager: StreamingQueryManager,
- val queryExecutionCreator: LogicalPlan => QueryExecution) {
+ createQueryExecution: LogicalPlan => QueryExecution,
+ createClone: (SparkSession, SessionState) => SessionState) {
def newHadoopConf(): Configuration = SessionState.newHadoopConf(
sparkContext.hadoopConfiguration,
@@ -77,41 +81,12 @@ private[sql] class SessionState(
}
/**
- * A class for loading resources specified by a function.
- */
- val functionResourceLoader: FunctionResourceLoader = {
- new FunctionResourceLoader {
- override def loadResource(resource: FunctionResource): Unit = {
- resource.resourceType match {
- case JarResource => addJar(resource.uri)
- case FileResource => sparkContext.addFile(resource.uri)
- case ArchiveResource =>
- throw new AnalysisException(
- "Archive is not allowed to be loaded. If YARN mode is used, " +
- "please use --archives options while calling spark-submit.")
- }
- }
- }
- }
-
- /**
* Interface exposed to the user for registering user-defined functions.
* Note that the user-defined functions must be deterministic.
*/
val udf: UDFRegistration = new UDFRegistration(functionRegistry)
/**
- * Logical query plan optimizer.
- */
- val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
-
- /**
- * Planner that converts optimized logical plans to physical plans.
- */
- def planner: SparkPlanner =
- new SparkPlanner(sparkContext, conf, experimentalMethods.extraStrategies)
-
- /**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
* that listen for execution metrics.
*/
@@ -120,38 +95,13 @@ private[sql] class SessionState(
/**
* Get an identical copy of the `SessionState` and associate it with the given `SparkSession`
*/
- def clone(newSparkSession: SparkSession): SessionState = {
- val sparkContext = newSparkSession.sparkContext
- val confCopy = conf.clone()
- val functionRegistryCopy = functionRegistry.clone()
- val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
- val catalogCopy = catalog.newSessionCatalogWith(
- confCopy,
- SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
- functionRegistryCopy,
- sqlParser)
- val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
-
- SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
- new SessionState(
- sparkContext,
- newSparkSession.sharedState,
- confCopy,
- experimentalMethods.clone(),
- functionRegistryCopy,
- catalogCopy,
- sqlParser,
- SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
- new StreamingQueryManager(newSparkSession),
- queryExecutionCreator)
- }
+ def clone(newSparkSession: SparkSession): SessionState = createClone(newSparkSession, this)
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------
- def executePlan(plan: LogicalPlan): QueryExecution = queryExecutionCreator(plan)
+ def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
@@ -179,53 +129,12 @@ private[sql] class SessionState(
}
}
-
private[sql] object SessionState {
-
- def apply(sparkSession: SparkSession): SessionState = {
- apply(sparkSession, new SQLConf)
- }
-
- def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
- val sparkContext = sparkSession.sparkContext
-
- // Automatically extract all entries and put them in our SQLConf
- mergeSparkConf(sqlConf, sparkContext.getConf)
-
- val functionRegistry = FunctionRegistry.builtin.clone()
-
- val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
-
- val catalog = new SessionCatalog(
- sparkSession.sharedState.externalCatalog,
- sparkSession.sharedState.globalTempViewManager,
- functionRegistry,
- sqlConf,
- newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
- sqlParser)
-
- val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
-
- val streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(sparkSession)
-
- val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(sparkSession, plan)
-
- val sessionState = new SessionState(
- sparkContext,
- sparkSession.sharedState,
- sqlConf,
- new ExperimentalMethods,
- functionRegistry,
- catalog,
- sqlParser,
- analyzer,
- streamingQueryManager,
- queryExecutionCreator)
- // functionResourceLoader needs to access SessionState.addJar, so it cannot be created before
- // creating SessionState. Setting `catalog.functionResourceLoader` here is safe since the caller
- // cannot use SessionCatalog before we return SessionState.
- catalog.functionResourceLoader = sessionState.functionResourceLoader
- sessionState
+ /**
+ * Create a new [[SessionState]] for the given session.
+ */
+ def apply(session: SparkSession): SessionState = {
+ new SessionStateBuilder(session).build()
}
def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = {
@@ -233,34 +142,33 @@ private[sql] object SessionState {
sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k, v) }
newHadoopConf
}
+}
- /**
- * Create an logical query plan `Analyzer` with rules specific to a non-Hive `SessionState`.
- */
- private def createAnalyzer(
- sparkSession: SparkSession,
- catalog: SessionCatalog,
- sqlConf: SQLConf): Analyzer = {
- new Analyzer(catalog, sqlConf) {
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new FindDataSourceTable(sparkSession) ::
- new ResolveSQLOnFile(sparkSession) :: Nil
-
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- PreprocessTableCreation(sparkSession) ::
- PreprocessTableInsertion(sqlConf) ::
- DataSourceAnalysis(sqlConf) :: Nil
-
- override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
- }
- }
+/**
+ * Concrete implementation of a [[SessionStateBuilder]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class SessionStateBuilder(
+ session: SparkSession,
+ parentState: Option[SessionState] = None)
+ extends BaseSessionStateBuilder(session, parentState) {
+ override protected def newBuilder: NewBuilder = new SessionStateBuilder(_, _)
+}
- /**
- * Extract entries from `SparkConf` and put them in the `SQLConf`
- */
- def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
- sparkConf.getAll.foreach { case (k, v) =>
- sqlConf.setConfString(k, v)
+/**
+ * Session shared [[FunctionResourceLoader]].
+ */
+@InterfaceStability.Unstable
+class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ resource.resourceType match {
+ case JarResource => session.sessionState.addJar(resource.uri)
+ case FileResource => session.sparkContext.addFile(resource.uri)
+ case ArchiveResource =>
+ throw new AnalysisException(
+ "Archive is not allowed to be loaded. If YARN mode is used, " +
+ "please use --archives options while calling spark-submit.")
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
new file mode 100644
index 0000000000..6b5559adb1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.streaming.StreamingQueryManager
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session state, and creates a session
+ * state when `build` is called. Components should only be initialized once. This is not a problem
+ * for most components as they are only used in the `build` function. However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some dependencies between the
+ * components (they are documented per dependency), a developer should respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is called, and not before.
+ */
+@Experimental
+@InterfaceStability.Unstable
+abstract class BaseSessionStateBuilder(
+ val session: SparkSession,
+ val parentState: Option[SessionState] = None) {
+ type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder
+
+ /**
+ * Function that produces a new instance of the SessionStateBuilder. This is used by the
+ * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own
+ * [[SessionStateBuilder]].
+ */
+ protected def newBuilder: NewBuilder
+
+ /**
+ * Extract entries from `SparkConf` and put them in the `SQLConf`
+ */
+ protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
+ sparkConf.getAll.foreach { case (k, v) =>
+ sqlConf.setConfString(k, v)
+ }
+ }
+
+ /**
+ * SQL-specific key-value configurations.
+ *
+ * These either get cloned from a pre-existing instance or newly created. The conf is always
+ * merged with its [[SparkConf]].
+ */
+ protected lazy val conf: SQLConf = {
+ val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+ mergeSparkConf(conf, session.sparkContext.conf)
+ conf
+ }
+
+ /**
+ * Internal catalog managing functions registered by the user.
+ *
+ * This either gets cloned from a pre-existing version or cloned from the built-in registry.
+ */
+ protected lazy val functionRegistry: FunctionRegistry = {
+ parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+ }
+
+ /**
+ * Experimental methods that can be used to define custom optimization rules and custom planning
+ * strategies.
+ *
+ * This either gets cloned from a pre-existing version or newly created.
+ */
+ protected lazy val experimentalMethods: ExperimentalMethods = {
+ parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods)
+ }
+
+ /**
+ * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+ *
+ * Note: this depends on the `conf` field.
+ */
+ protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
+
+ /**
+ * Catalog for managing table and database states. If there is a pre-existing catalog, the state
+ * of that catalog (temp tables & current database) will be copied into the new catalog.
+ *
+ * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
+ */
+ protected lazy val catalog: SessionCatalog = {
+ val catalog = new SessionCatalog(
+ session.sharedState.externalCatalog,
+ session.sharedState.globalTempViewManager,
+ functionRegistry,
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+ sqlParser,
+ new SessionFunctionResourceLoader(session))
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
+ }
+
+ /**
+ * Logical query plan analyzer for resolving unresolved attributes and relations.
+ *
+ * Note: this depends on the `conf` and `catalog` fields.
+ */
+ protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ customResolutionRules
+
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ customPostHocResolutionRules
+
+ override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+ PreWriteCheck +:
+ HiveOnlyCheck +:
+ customCheckRules
+ }
+
+ /**
+ * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating
+ * your own Analyzer.
+ *
+ * Note that this may NOT depend on the `analyzer` function.
+ */
+ protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+ /**
+ * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of
+ * creating your own Analyzer.
+ *
+ * Note that this may NOT depend on the `analyzer` function.
+ */
+ protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+ /**
+ * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating
+ * your own Analyzer.
+ *
+ * Note that this may NOT depend on the `analyzer` function.
+ */
+ protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
+
+ /**
+ * Logical query plan optimizer.
+ *
+ * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields.
+ */
+ protected def optimizer: Optimizer = {
+ new SparkOptimizer(catalog, conf, experimentalMethods) {
+ override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
+ super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
+ }
+ }
+
+ /**
+ * Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead
+ * of creating your own Optimizer.
+ *
+ * Note that this may NOT depend on the `optimizer` function.
+ */
+ protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
+
+ /**
+ * Planner that converts optimized logical plans to physical plans.
+ *
+ * Note: this depends on the `conf` and `experimentalMethods` fields.
+ */
+ protected def planner: SparkPlanner = {
+ new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
+ override def extraPlanningStrategies: Seq[Strategy] =
+ super.extraPlanningStrategies ++ customPlanningStrategies
+ }
+ }
+
+ /**
+ * Custom strategies to add to the planner. Prefer overriding this instead of creating
+ * your own Planner.
+ *
+ * Note that this may NOT depend on the `planner` function.
+ */
+ protected def customPlanningStrategies: Seq[Strategy] = Nil
+
+ /**
+ * Create a query execution object.
+ */
+ protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
+ new QueryExecution(session, plan)
+ }
+
+ /**
+ * Interface to start and stop streaming queries.
+ */
+ protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)
+
+ /**
+ * Function used to make clones of the session state.
+ */
+ protected def createClone: (SparkSession, SessionState) => SessionState = {
+ val createBuilder = newBuilder
+ (session, state) => createBuilder(session, Option(state)).build()
+ }
+
+ /**
+ * Build the [[SessionState]].
+ */
+ def build(): SessionState = {
+ new SessionState(
+ session.sparkContext,
+ session.sharedState,
+ conf,
+ experimentalMethods,
+ functionRegistry,
+ catalog,
+ sqlParser,
+ analyzer,
+ optimizer,
+ planner,
+ streamingQueryManager,
+ createQueryExecution,
+ createClone)
+ }
+}
+
+/**
+ * Helper class for using SessionStateBuilders during tests.
+ */
+private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
+ def overrideConfs: Map[String, String]
+
+ override protected lazy val conf: SQLConf = {
+ val conf = parentState.map(_.conf.clone()).getOrElse {
+ new SQLConf {
+ clear()
+ override def clear(): Unit = {
+ super.clear()
+ // Make sure we start with the default test configs even after clear
+ overrideConfs.foreach { case (key, value) => setConfString(key, value) }
+ }
+ }
+ }
+ mergeSparkConf(conf, session.sparkContext.conf)
+ conf
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 898a2fb4f3..b01977a238 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf, WithTestConf}
/**
* A special [[SparkSession]] prepared for testing.
@@ -35,16 +35,9 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) {
}
@transient
- override lazy val sessionState: SessionState = SessionState(
- this,
- new SQLConf {
- clear()
- override def clear(): Unit = {
- super.clear()
- // Make sure we start with the default test configs even after clear
- TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) }
- }
- })
+ override lazy val sessionState: SessionState = {
+ new TestSQLSessionStateBuilder(this, None).build()
+ }
// Needed for Java tests
def loadTestData(): Unit = {
@@ -67,3 +60,11 @@ private[sql] object TestSQLContext {
// Fewer shuffle partitions to speed up testing.
SQLConf.SHUFFLE_PARTITIONS.key -> "5")
}
+
+private[sql] class TestSQLSessionStateBuilder(
+ session: SparkSession,
+ state: Option[SessionState])
+ extends SessionStateBuilder(session, state) with WithTestConf {
+ override def overrideConfs: Map[String, String] = TestSQLContext.overrideConfs
+ override def newBuilder: NewBuilder = new TestSQLSessionStateBuilder(_, _)
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 6b7599e3d3..2cc20a791d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
-import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
@@ -47,14 +47,16 @@ private[sql] class HiveSessionCatalog(
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
- parser: ParserInterface)
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionRegistry,
conf,
hadoopConf,
- parser) {
+ parser,
+ functionResourceLoader) {
// ----------------------------------------------------------------
// | Methods and fields for interacting with HiveMetastoreCatalog |
@@ -69,47 +71,6 @@ private[sql] class HiveSessionCatalog(
metastoreCatalog.hiveDefaultTableFilePath(name)
}
- /**
- * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and
- * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
- */
- def newSessionCatalogWith(
- newSparkSession: SparkSession,
- conf: SQLConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): HiveSessionCatalog = {
- val catalog = HiveSessionCatalog(
- newSparkSession,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
-
- synchronized {
- catalog.currentDb = currentDb
- // copy over temporary tables
- tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
- }
-
- catalog
- }
-
- /**
- * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add
- * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However,
- * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of
- * `newSessionCatalogWith` and disable this one.
- *
- * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly.
- */
- override def newSessionCatalogWith(
- conf: CatalystConf,
- hadoopConf: Configuration,
- functionRegistry: FunctionRegistry,
- parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException(
- "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession")
-
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = metastoreCatalog.getQualifiedTableName(table)
@@ -250,28 +211,3 @@ private[sql] class HiveSessionCatalog(
"histogram_numeric"
)
}
-
-private[sql] object HiveSessionCatalog {
-
- def apply(
- sparkSession: SparkSession,
- functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hadoopConf: Configuration,
- parser: ParserInterface): HiveSessionCatalog = {
- // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
- // essentially a cache for metastore tables. However, it relies on a lot of session-specific
- // things so it would be a lot of work to split its functionality between HiveSessionCatalog
- // and HiveCatalog. We should still do it at some point...
- val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
-
- new HiveSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- metastoreCatalog,
- functionRegistry,
- conf,
- hadoopConf,
- parser)
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index cb8bcb8591..49ff8478f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,20 +18,23 @@
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
import org.apache.spark.sql.streaming.StreamingQueryManager
/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
+ *
* @param sparkContext The [[SparkContext]].
* @param sharedState The shared state.
* @param conf SQL-specific key-value configurations.
@@ -40,12 +43,14 @@ import org.apache.spark.sql.streaming.StreamingQueryManager
* @param catalog Internal catalog for managing table and database states that uses Hive client for
* interacting with the metastore.
* @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
- * @param metadataHive The Hive metadata client.
* @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- * [[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
- * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to physical plans and that takes
+ * Hive-specific strategies into account.
+ * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param createQueryExecution Function used to create QueryExecution objects.
+ * @param createClone Function used to create clones of the session state.
+ * @param metadataHive The Hive metadata client.
*/
private[hive] class HiveSessionState(
sparkContext: SparkContext,
@@ -55,11 +60,13 @@ private[hive] class HiveSessionState(
functionRegistry: FunctionRegistry,
override val catalog: HiveSessionCatalog,
sqlParser: ParserInterface,
- val metadataHive: HiveClient,
analyzer: Analyzer,
+ optimizer: Optimizer,
+ planner: SparkPlanner,
streamingQueryManager: StreamingQueryManager,
- queryExecutionCreator: LogicalPlan => QueryExecution,
- val plannerCreator: () => SparkPlanner)
+ createQueryExecution: LogicalPlan => QueryExecution,
+ createClone: (SparkSession, SessionState) => SessionState,
+ val metadataHive: HiveClient)
extends SessionState(
sparkContext,
sharedState,
@@ -69,14 +76,11 @@ private[hive] class HiveSessionState(
catalog,
sqlParser,
analyzer,
+ optimizer,
+ planner,
streamingQueryManager,
- queryExecutionCreator) { self =>
-
- /**
- * Planner that takes into account Hive-specific strategies.
- */
- override def planner: SparkPlanner = plannerCreator()
-
+ createQueryExecution,
+ createClone) {
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
@@ -121,150 +125,115 @@ private[hive] class HiveSessionState(
def hiveThriftServerAsync: Boolean = {
conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
}
+}
+private[hive] object HiveSessionState {
/**
- * Get an identical copy of the `HiveSessionState`.
- * This should ideally reuse the `SessionState.clone` but cannot do so.
- * Doing that will throw an exception when trying to clone the catalog.
+ * Create a new [[HiveSessionState]] for the given session.
*/
- override def clone(newSparkSession: SparkSession): HiveSessionState = {
- val sparkContext = newSparkSession.sparkContext
- val confCopy = conf.clone()
- val functionRegistryCopy = functionRegistry.clone()
- val experimentalMethodsCopy = experimentalMethods.clone()
- val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
- val catalogCopy = catalog.newSessionCatalogWith(
- newSparkSession,
- confCopy,
- SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
- functionRegistryCopy,
- sqlParser)
- val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
-
- val hiveClient =
- newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .newSession()
-
- SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
- new HiveSessionState(
- sparkContext,
- newSparkSession.sharedState,
- confCopy,
- experimentalMethodsCopy,
- functionRegistryCopy,
- catalogCopy,
- sqlParser,
- hiveClient,
- HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
- new StreamingQueryManager(newSparkSession),
- queryExecutionCreator,
- HiveSessionState.createPlannerCreator(
- newSparkSession,
- confCopy,
- experimentalMethodsCopy))
+ def apply(session: SparkSession): HiveSessionState = {
+ new HiveSessionStateBuilder(session).build()
}
-
}
-private[hive] object HiveSessionState {
-
- def apply(sparkSession: SparkSession): HiveSessionState = {
- apply(sparkSession, new SQLConf)
- }
-
- def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = {
- val initHelper = SessionState(sparkSession, conf)
-
- val sparkContext = sparkSession.sparkContext
-
- val catalog = HiveSessionCatalog(
- sparkSession,
- initHelper.functionRegistry,
- initHelper.conf,
- SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf),
- initHelper.sqlParser)
-
- val metadataHive: HiveClient =
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .newSession()
-
- val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf)
+/**
+ * Builder that produces a [[HiveSessionState]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None)
+ extends BaseSessionStateBuilder(session, parentState) {
- val plannerCreator = createPlannerCreator(
- sparkSession,
- initHelper.conf,
- initHelper.experimentalMethods)
+ private def externalCatalog: HiveExternalCatalog =
+ session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
- val hiveSessionState = new HiveSessionState(
- sparkContext,
- sparkSession.sharedState,
- initHelper.conf,
- initHelper.experimentalMethods,
- initHelper.functionRegistry,
- catalog,
- initHelper.sqlParser,
- metadataHive,
- analyzer,
- initHelper.streamingQueryManager,
- initHelper.queryExecutionCreator,
- plannerCreator)
- catalog.functionResourceLoader = hiveSessionState.functionResourceLoader
- hiveSessionState
+ /**
+ * Create a [[HiveSessionCatalog]].
+ */
+ override protected lazy val catalog: HiveSessionCatalog = {
+ val catalog = new HiveSessionCatalog(
+ externalCatalog,
+ session.sharedState.globalTempViewManager,
+ new HiveMetastoreCatalog(session),
+ functionRegistry,
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+ sqlParser,
+ new SessionFunctionResourceLoader(session))
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
}
/**
- * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`.
+ * A logical query plan `Analyzer` with rules specific to Hive.
*/
- private def createAnalyzer(
- sparkSession: SparkSession,
- catalog: HiveSessionCatalog,
- sqlConf: SQLConf): Analyzer = {
- new Analyzer(catalog, sqlConf) {
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new ResolveHiveSerdeTable(sparkSession) ::
- new FindDataSourceTable(sparkSession) ::
- new ResolveSQLOnFile(sparkSession) :: Nil
-
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- new DetermineTableStats(sparkSession) ::
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- PreprocessTableCreation(sparkSession) ::
- PreprocessTableInsertion(sqlConf) ::
- DataSourceAnalysis(sqlConf) ::
- HiveAnalysis :: Nil
+ override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new ResolveHiveSerdeTable(session) +:
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ customResolutionRules
+
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ new DetermineTableStats(session) +:
+ catalog.ParquetConversions +:
+ catalog.OrcConversions +:
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ HiveAnalysis +:
+ customPostHocResolutionRules
+
+ override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+ PreWriteCheck +:
+ customCheckRules
+ }
- override val extendedCheckRules = Seq(PreWriteCheck)
+ /**
+ * Planner that takes into account Hive-specific strategies.
+ */
+ override protected def planner: SparkPlanner = {
+ new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies {
+ override val sparkSession: SparkSession = session
+
+ override def extraPlanningStrategies: Seq[Strategy] =
+ super.extraPlanningStrategies ++ customPlanningStrategies
+
+ override def strategies: Seq[Strategy] = {
+ experimentalMethods.extraStrategies ++
+ extraPlanningStrategies ++ Seq(
+ FileSourceStrategy,
+ DataSourceStrategy,
+ SpecialLimits,
+ InMemoryScans,
+ HiveTableScans,
+ Scripts,
+ Aggregation,
+ JoinSelection,
+ BasicOperators
+ )
+ }
}
}
- private def createPlannerCreator(
- associatedSparkSession: SparkSession,
- sqlConf: SQLConf,
- experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
- () =>
- new SparkPlanner(
- associatedSparkSession.sparkContext,
- sqlConf,
- experimentalMethods.extraStrategies)
- with HiveStrategies {
-
- override val sparkSession: SparkSession = associatedSparkSession
+ override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
- override def strategies: Seq[Strategy] = {
- experimentalMethods.extraStrategies ++ Seq(
- FileSourceStrategy,
- DataSourceStrategy,
- SpecialLimits,
- InMemoryScans,
- HiveTableScans,
- Scripts,
- Aggregation,
- JoinSelection,
- BasicOperators
- )
- }
- }
+ override def build(): HiveSessionState = {
+ val metadataHive: HiveClient = externalCatalog.client.newSession()
+ new HiveSessionState(
+ session.sparkContext,
+ session.sharedState,
+ conf,
+ experimentalMethods,
+ functionRegistry,
+ catalog,
+ sqlParser,
+ analyzer,
+ optimizer,
+ planner,
+ streamingQueryManager,
+ createQueryExecution,
+ createClone,
+ metadataHive)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b63ed76967..32ca69605e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -148,12 +148,14 @@ class TestHiveContext(
*
* @param sc SparkContext
* @param existingSharedState optional [[SharedState]]
+ * @param parentSessionState optional parent [[SessionState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
@transient private val existingSharedState: Option[TestHiveSharedState],
+ @transient private val parentSessionState: Option[HiveSessionState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@@ -161,6 +163,7 @@ private[hive] class TestHiveSparkSession(
this(
sc,
existingSharedState = None,
+ parentSessionState = None,
loadTestTables)
}
@@ -168,6 +171,7 @@ private[hive] class TestHiveSparkSession(
this(
sc,
existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))),
+ parentSessionState = None,
loadTestTables)
}
@@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession(
@transient
override lazy val sessionState: HiveSessionState = {
- val testConf =
- new SQLConf {
- clear()
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
- override def clear(): Unit = {
- super.clear()
- TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
- }
- }
- val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan)
- val initHelper = HiveSessionState(this, testConf)
- SessionState.mergeSparkConf(testConf, sparkContext.getConf)
-
- new HiveSessionState(
- sparkContext,
- sharedState,
- testConf,
- initHelper.experimentalMethods,
- initHelper.functionRegistry,
- initHelper.catalog,
- initHelper.sqlParser,
- initHelper.metadataHive,
- initHelper.analyzer,
- initHelper.streamingQueryManager,
- queryExecutionCreator,
- initHelper.plannerCreator)
+ new TestHiveSessionStateBuilder(this, parentSessionState).build()
}
override def newSession(): TestHiveSparkSession = {
- new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
+ new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
+ }
+
+ override def cloneSession(): SparkSession = {
+ val result = new TestHiveSparkSession(
+ sparkContext,
+ Some(sharedState),
+ Some(sessionState),
+ loadTestTables)
+ result.sessionState // force copy of SessionState
+ result
}
private var cacheTables: Boolean = false
@@ -595,3 +584,18 @@ private[hive] object TestHiveContext {
}
}
+
+private[sql] class TestHiveSessionStateBuilder(
+ session: SparkSession,
+ state: Option[SessionState])
+ extends HiveSessionStateBuilder(session, state)
+ with WithTestConf {
+
+ override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs
+
+ override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan =>
+ new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan)
+ }
+
+ override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _)
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
deleted file mode 100644
index 3b0f59b159..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.Range
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
-
-class HiveSessionCatalogSuite extends TestHiveSingleton {
-
- test("clone HiveSessionCatalog") {
- val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-
- val tempTableName1 = "copytest1"
- val tempTableName2 = "copytest2"
- try {
- val tempTable1 = Range(1, 10, 1, 10)
- original.createTempView(tempTableName1, tempTable1, overrideIfExists = false)
-
- // check if tables copied over
- val clone = original.newSessionCatalogWith(
- spark,
- new SQLConf,
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
- assert(original ne clone)
- assert(clone.getTempView(tempTableName1) == Some(tempTable1))
-
- // check if clone and original independent
- clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false)
- assert(original.getTempView(tempTableName1) == Some(tempTable1))
-
- val tempTable2 = Range(1, 20, 2, 10)
- original.createTempView(tempTableName2, tempTable2, overrideIfExists = false)
- assert(clone.getTempView(tempTableName2).isEmpty)
- } finally {
- // Drop the created temp views from the global singleton HiveSession.
- original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true)
- original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true)
- }
- }
-
- test("clone SessionCatalog - current db") {
- val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
- val originalCurrentDatabase = original.getCurrentDatabase
- val db1 = "db1"
- val db2 = "db2"
- val db3 = "db3"
- try {
- original.createDatabase(newDb(db1), ignoreIfExists = true)
- original.createDatabase(newDb(db2), ignoreIfExists = true)
- original.createDatabase(newDb(db3), ignoreIfExists = true)
-
- original.setCurrentDatabase(db1)
-
- // check if tables copied over
- val clone = original.newSessionCatalogWith(
- spark,
- new SQLConf,
- new Configuration(),
- new SimpleFunctionRegistry,
- CatalystSqlParser)
-
- // check if current db copied over
- assert(original ne clone)
- assert(clone.getCurrentDatabase == db1)
-
- // check if clone and original independent
- clone.setCurrentDatabase(db2)
- assert(original.getCurrentDatabase == db1)
- original.setCurrentDatabase(db3)
- assert(clone.getCurrentDatabase == db2)
- } finally {
- // Drop the created databases from the global singleton HiveSession.
- original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true)
- original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true)
- original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true)
- original.setCurrentDatabase(originalCurrentDatabase)
- }
- }
-
- def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
-
- def newDb(name: String): CatalogDatabase = {
- CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
- }
-}