aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-27 19:51:28 -0800
committerReynold Xin <rxin@databricks.com>2016-02-27 19:51:28 -0800
commitcca79fad66c4315b0ed6de59fd87700a540e6646 (patch)
treeca305c4e716f0138e1130bc7d6ab3a5bcf3928cf
parent4c5e968db23db41c0bea802819ebd75fad63bc2b (diff)
downloadspark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.gz
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.bz2
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.zip
[SPARK-13526][SQL] Move SQLContext per-session states to new class
## What changes were proposed in this pull request? This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts. ## How was this patch tested? Existing unit tests; this patch introduces no change in behavior. Author: Andrew Or <andrew@databricks.com> Closes #11405 from andrewor14/refactor-session.
-rw-r--r--project/MimaExcludes.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala111
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala22
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala81
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala103
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala7
10 files changed, 294 insertions, 164 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 165280a1b2..9ce37fc753 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -271,13 +271,18 @@ object MimaExcludes {
) ++ Seq(
// SPARK-13220 Deprecate yarn-client and yarn-cluster mode
ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler"),
+ "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
+ ) ++ Seq(
// SPARK-13465 TaskContext.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.addTaskFailureListener")
) ++ Seq (
// SPARK-7729 Executor which has been killed should also be displayed on Executor Tab
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
) ++ Seq(
+ // SPARK-13526 Move SQLContext per-session states to new class
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.sql.UDFRegistration.this")
+ ) ++ Seq(
// [SPARK-13486][SQL] Move SQLConf into an internal package
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c24d9e4ae..cb4a6397b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,13 +25,12 @@ import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.{execution => sparkexecution}
-import org.apache.spark.sql.catalyst.{InternalRow, _}
+import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
@@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
@@ -68,7 +66,7 @@ class SQLContext private[sql](
@transient protected[sql] val cacheManager: CacheManager,
@transient private[sql] val listener: SQLListener,
val isRootContext: Boolean)
- extends org.apache.spark.Logging with Serializable {
+ extends Logging with Serializable {
self =>
@@ -115,9 +113,27 @@ class SQLContext private[sql](
}
/**
- * @return Spark SQL configuration
+ * Per-session state, e.g. configuration, functions, temporary tables etc.
*/
- protected[sql] lazy val conf = new SQLConf
+ @transient
+ protected[sql] lazy val sessionState: SessionState = new SessionState(self)
+ protected[sql] def conf: SQLConf = sessionState.conf
+ protected[sql] def catalog: Catalog = sessionState.catalog
+ protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry
+ protected[sql] def analyzer: Analyzer = sessionState.analyzer
+ protected[sql] def optimizer: Optimizer = sessionState.optimizer
+ protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser
+ protected[sql] def planner: SparkPlanner = sessionState.planner
+ protected[sql] def continuousQueryManager = sessionState.continuousQueryManager
+ protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] =
+ sessionState.prepareForExecution
+
+ /**
+ * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+ * that listen for execution metrics.
+ */
+ @Experimental
+ def listenerManager: ExecutionListenerManager = sessionState.listenerManager
/**
* Set Spark SQL configuration properties.
@@ -179,43 +195,11 @@ class SQLContext private[sql](
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
- @transient
- lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
-
- protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this)
-
- @transient
- protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
-
- @transient
- protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
-
- @transient
- protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, conf) {
- override val extendedResolutionRules =
- python.ExtractPythonUDFs ::
- PreInsertCastAndRename ::
- (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
- override val extendedCheckRules = Seq(
- datasources.PreWriteCheck(catalog)
- )
- }
-
- @transient
- protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this)
-
- @transient
- protected[sql] val sqlParser: ParserInterface = new SparkQl(conf)
-
protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql)
- protected[sql] def executeSql(sql: String):
- org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
+ protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
- protected[sql] def executePlan(plan: LogicalPlan) =
- new sparkexecution.QueryExecution(this, plan)
+ protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)
/**
* Add a jar to SQLContext
@@ -299,10 +283,8 @@ class SQLContext private[sql](
*
* @group basic
* @since 1.3.0
- * TODO move to SQLSession?
*/
- @transient
- val udf: UDFRegistration = new UDFRegistration(this)
+ def udf: UDFRegistration = sessionState.udf
/**
* Returns true if the table is currently cached in-memory.
@@ -873,25 +855,9 @@ class SQLContext private[sql](
}
@transient
- protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
-
- @transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
/**
- * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal
- * row format conversions as needed.
- */
- @transient
- protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
- val batches = Seq(
- Batch("Subquery", Once, PlanSubqueries(self)),
- Batch("Add exchange", Once, EnsureRequirements(self)),
- Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
- )
- }
-
- /**
* Parses the data type in our internal string representation. The data type string should
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index de01cbcb0e..d894825632 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -22,6 +22,7 @@ import scala.util.Try
import org.apache.spark.Logging
import org.apache.spark.sql.api.java._
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
import org.apache.spark.sql.execution.aggregate.ScalaUDAF
@@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType
*
* @since 1.3.0
*/
-class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
-
- private val functionRegistry = sqlContext.functionRegistry
+class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging {
protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = {
log.debug(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
new file mode 100644
index 0000000000..f93a405f77
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration}
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.exchange.EnsureRequirements
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+
+/**
+ * A class that holds all session-specific state in a given [[SQLContext]].
+ */
+private[sql] class SessionState(ctx: SQLContext) {
+
+ // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+ // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+ /**
+ * SQL-specific key-value configurations.
+ */
+ lazy val conf = new SQLConf
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ lazy val catalog: Catalog = new SimpleCatalog(conf)
+
+ /**
+ * Internal catalog for managing functions registered by the user.
+ */
+ lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
+
+ /**
+ * Interface exposed to the user for registering user-defined functions.
+ */
+ lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+
+ /**
+ * Logical query plan analyzer for resolving unresolved attributes and relations.
+ */
+ lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, functionRegistry, conf) {
+ override val extendedResolutionRules =
+ python.ExtractPythonUDFs ::
+ PreInsertCastAndRename ::
+ (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+ override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
+ }
+ }
+
+ /**
+ * Logical query plan optimizer.
+ */
+ lazy val optimizer: Optimizer = new SparkOptimizer(ctx)
+
+ /**
+ * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+ */
+ lazy val sqlParser: ParserInterface = new SparkQl(conf)
+
+ /**
+ * Planner that converts optimized logical plans to physical plans.
+ */
+ lazy val planner: SparkPlanner = new SparkPlanner(ctx)
+
+ /**
+ * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
+ * row format conversions as needed.
+ */
+ lazy val prepareForExecution = new RuleExecutor[SparkPlan] {
+ override val batches: Seq[Batch] = Seq(
+ Batch("Subquery", Once, PlanSubqueries(ctx)),
+ Batch("Add exchange", Once, EnsureRequirements(ctx)),
+ Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx))
+ )
+ }
+
+ /**
+ * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+ * that listen for execution metrics.
+ */
+ lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
+
+ /**
+ * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
+ */
+ lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 28ad7ae64a..b3e146fba8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
/**
* A special [[SQLContext]] prepared for testing.
@@ -31,16 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel
new SparkConf().set("spark.sql.testkey", "true")))
}
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
-
- clear()
-
- override def clear(): Unit = {
- super.clear()
-
- // Make sure we start with the default test configs even after clear
- TestSQLContext.overrideConfs.foreach {
- case (key, value) => setConfString(key, value)
+ @transient
+ protected[sql] override lazy val sessionState: SessionState = new SessionState(self) {
+ override lazy val conf: SQLConf = {
+ new SQLConf {
+ clear()
+ override def clear(): Unit = {
+ super.clear()
+ // Make sure we start with the default test configs even after clear
+ TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) }
+ }
}
}
}
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index d15f883138..0dc2a95eea 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Enable in-memory partition pruning for testing purposes
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Use Hive hash expression instead of the native one
- TestHive.functionRegistry.unregisterFunction("hash")
+ TestHive.sessionState.functionRegistry.unregisterFunction("hash")
RuleExecutor.resetTime()
}
@@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- TestHive.functionRegistry.restore()
+ TestHive.sessionState.functionRegistry.restore()
// For debugging dump some statistics about how much time was spent in various optimizer rules.
logWarning(RuleExecutor.dumpTimeSpent())
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d511dd685c..a9295d31c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
import org.apache.spark.sql.types._
@@ -110,6 +108,16 @@ class HiveContext private[hive](
isRootContext = false)
}
+ @transient
+ protected[sql] override lazy val sessionState = new HiveSessionState(self)
+
+ protected[sql] override def catalog = sessionState.catalog
+
+ // The Hive UDF current_database() is foldable, will be evaluated by optimizer,
+ // but the optimizer can't access the SessionState of metadataHive.
+ sessionState.functionRegistry.registerFunction(
+ "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
+
/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -442,39 +450,6 @@ class HiveContext private[hive](
setConf(entry.key, entry.stringConverter(value))
}
- /* A catalyst metadata catalog that points to the Hive Metastore. */
- @transient
- override protected[sql] lazy val catalog =
- new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog
-
- // Note that HiveUDFs will be overridden by functions registered in this context.
- @transient
- override protected[sql] lazy val functionRegistry: FunctionRegistry =
- new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive)
-
- // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer
- // can't access the SessionState of metadataHive.
- functionRegistry.registerFunction(
- "current_database",
- (expressions: Seq[Expression]) => new CurrentDatabase(this))
-
- /* An analyzer that uses the Hive metastore. */
- @transient
- override protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, conf) {
- override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.CreateTables ::
- catalog.PreInsertionCasts ::
- python.ExtractPythonUDFs ::
- PreInsertCastAndRename ::
- (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
- override val extendedCheckRules = Seq(
- PreWriteCheck(catalog)
- )
- }
-
/** Overridden by child classes that need to set configuration before the client init. */
protected def configure(): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
@@ -544,37 +519,6 @@ class HiveContext private[hive](
c
}
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
- }
-
- @transient
- protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf)
-
- @transient
- private val hivePlanner = new SparkPlanner(this) with HiveStrategies {
- val hiveContext = self
-
- override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
- DataSourceStrategy,
- HiveCommandStrategy(self),
- HiveDDLStrategy,
- DDLStrategy,
- SpecialLimits,
- InMemoryScans,
- HiveTableScans,
- DataSinks,
- Scripts,
- Aggregation,
- LeftSemiJoin,
- EquiJoinSelection,
- BasicOperators,
- BroadcastNestedLoop,
- CartesianProduct,
- DefaultJoin
- )
- }
-
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
@@ -590,9 +534,6 @@ class HiveContext private[hive](
}
}
- @transient
- override protected[sql] val planner = hivePlanner
-
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
new file mode 100644
index 0000000000..09f54be04d
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.{python, SparkPlanner}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+
+
+/**
+ * A class that holds all session-specific state in a given [[HiveContext]].
+ */
+private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
+
+ override lazy val conf: SQLConf = new SQLConf {
+ override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+ }
+
+ /**
+ * A metadata catalog that points to the Hive metastore.
+ */
+ override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
+
+ /**
+ * Internal catalog for managing functions registered by the user.
+ * Note that HiveUDFs will be overridden by functions registered in this context.
+ */
+ override lazy val functionRegistry: FunctionRegistry = {
+ new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
+ }
+
+ /**
+ * An analyzer that uses the Hive metastore.
+ */
+ override lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, functionRegistry, conf) {
+ override val extendedResolutionRules =
+ catalog.ParquetConversions ::
+ catalog.CreateTables ::
+ catalog.PreInsertionCasts ::
+ python.ExtractPythonUDFs ::
+ PreInsertCastAndRename ::
+ (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+ override val extendedCheckRules = Seq(PreWriteCheck(catalog))
+ }
+ }
+
+ /**
+ * Parser for HiveQl query texts.
+ */
+ override lazy val sqlParser: ParserInterface = new HiveQl(conf)
+
+ /**
+ * Planner that takes into account Hive-specific strategies.
+ */
+ override lazy val planner: SparkPlanner = {
+ new SparkPlanner(ctx) with HiveStrategies {
+ override val hiveContext = ctx
+
+ override def strategies: Seq[Strategy] = {
+ ctx.experimental.extraStrategies ++ Seq(
+ DataSourceStrategy,
+ HiveCommandStrategy(ctx),
+ HiveDDLStrategy,
+ DDLStrategy,
+ SpecialLimits,
+ InMemoryScans,
+ HiveTableScans,
+ DataSinks,
+ Scripts,
+ Aggregation,
+ LeftSemiJoin,
+ EquiJoinSelection,
+ BasicOperators,
+ BroadcastNestedLoop,
+ CartesianProduct,
+ DefaultJoin
+ )
+ }
+ }
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9d0622bea9..a7eca46d19 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -120,18 +120,25 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-
- clear()
-
- override def clear(): Unit = {
- super.clear()
-
- TestHiveContext.overrideConfs.map {
- case (key, value) => setConfString(key, value)
+ @transient
+ protected[sql] override lazy val sessionState = new HiveSessionState(this) {
+ override lazy val conf: SQLConf = {
+ new SQLConf {
+ clear()
+ override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+ override def clear(): Unit = {
+ super.clear()
+ TestHiveContext.overrideConfs.map {
+ case (key, value) => setConfString(key, value)
+ }
+ }
}
}
+
+ override lazy val functionRegistry = {
+ new TestHiveFunctionRegistry(
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+ }
}
/**
@@ -454,9 +461,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
}
}
- @transient
- override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry(
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
}
private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a127cf6e4b..d81c566822 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
test("Caching converted data source Parquet Relations") {
- def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = {
+ val _catalog = catalog
+ def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
@@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet")
+ var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
@@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+ tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""