aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-27 19:51:28 -0800
committerReynold Xin <rxin@databricks.com>2016-02-27 19:51:28 -0800
commitcca79fad66c4315b0ed6de59fd87700a540e6646 (patch)
treeca305c4e716f0138e1130bc7d6ab3a5bcf3928cf /sql/hive
parent4c5e968db23db41c0bea802819ebd75fad63bc2b (diff)
downloadspark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.gz
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.tar.bz2
spark-cca79fad66c4315b0ed6de59fd87700a540e6646.zip
[SPARK-13526][SQL] Move SQLContext per-session states to new class
## What changes were proposed in this pull request? This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts. ## How was this patch tested? Existing unit tests; this patch introduces no change in behavior. Author: Andrew Or <andrew@databricks.com> Closes #11405 from andrewor14/refactor-session.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala81
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala103
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala7
5 files changed, 137 insertions, 88 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index d15f883138..0dc2a95eea 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// Enable in-memory partition pruning for testing purposes
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Use Hive hash expression instead of the native one
- TestHive.functionRegistry.unregisterFunction("hash")
+ TestHive.sessionState.functionRegistry.unregisterFunction("hash")
RuleExecutor.resetTime()
}
@@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- TestHive.functionRegistry.restore()
+ TestHive.sessionState.functionRegistry.restore()
// For debugging dump some statistics about how much time was spent in various optimizer rules.
logWarning(RuleExecutor.dumpTimeSpent())
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d511dd685c..a9295d31c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
import org.apache.spark.sql.types._
@@ -110,6 +108,16 @@ class HiveContext private[hive](
isRootContext = false)
}
+ @transient
+ protected[sql] override lazy val sessionState = new HiveSessionState(self)
+
+ protected[sql] override def catalog = sessionState.catalog
+
+ // The Hive UDF current_database() is foldable, will be evaluated by optimizer,
+ // but the optimizer can't access the SessionState of metadataHive.
+ sessionState.functionRegistry.registerFunction(
+ "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
+
/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -442,39 +450,6 @@ class HiveContext private[hive](
setConf(entry.key, entry.stringConverter(value))
}
- /* A catalyst metadata catalog that points to the Hive Metastore. */
- @transient
- override protected[sql] lazy val catalog =
- new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog
-
- // Note that HiveUDFs will be overridden by functions registered in this context.
- @transient
- override protected[sql] lazy val functionRegistry: FunctionRegistry =
- new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive)
-
- // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer
- // can't access the SessionState of metadataHive.
- functionRegistry.registerFunction(
- "current_database",
- (expressions: Seq[Expression]) => new CurrentDatabase(this))
-
- /* An analyzer that uses the Hive metastore. */
- @transient
- override protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, conf) {
- override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.CreateTables ::
- catalog.PreInsertionCasts ::
- python.ExtractPythonUDFs ::
- PreInsertCastAndRename ::
- (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
- override val extendedCheckRules = Seq(
- PreWriteCheck(catalog)
- )
- }
-
/** Overridden by child classes that need to set configuration before the client init. */
protected def configure(): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
@@ -544,37 +519,6 @@ class HiveContext private[hive](
c
}
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
- }
-
- @transient
- protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf)
-
- @transient
- private val hivePlanner = new SparkPlanner(this) with HiveStrategies {
- val hiveContext = self
-
- override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
- DataSourceStrategy,
- HiveCommandStrategy(self),
- HiveDDLStrategy,
- DDLStrategy,
- SpecialLimits,
- InMemoryScans,
- HiveTableScans,
- DataSinks,
- Scripts,
- Aggregation,
- LeftSemiJoin,
- EquiJoinSelection,
- BasicOperators,
- BroadcastNestedLoop,
- CartesianProduct,
- DefaultJoin
- )
- }
-
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
@@ -590,9 +534,6 @@ class HiveContext private[hive](
}
}
- @transient
- override protected[sql] val planner = hivePlanner
-
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
new file mode 100644
index 0000000000..09f54be04d
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.{python, SparkPlanner}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+
+
+/**
+ * A class that holds all session-specific state in a given [[HiveContext]].
+ */
+private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
+
+ override lazy val conf: SQLConf = new SQLConf {
+ override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+ }
+
+ /**
+ * A metadata catalog that points to the Hive metastore.
+ */
+ override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
+
+ /**
+ * Internal catalog for managing functions registered by the user.
+ * Note that HiveUDFs will be overridden by functions registered in this context.
+ */
+ override lazy val functionRegistry: FunctionRegistry = {
+ new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
+ }
+
+ /**
+ * An analyzer that uses the Hive metastore.
+ */
+ override lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, functionRegistry, conf) {
+ override val extendedResolutionRules =
+ catalog.ParquetConversions ::
+ catalog.CreateTables ::
+ catalog.PreInsertionCasts ::
+ python.ExtractPythonUDFs ::
+ PreInsertCastAndRename ::
+ (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+ override val extendedCheckRules = Seq(PreWriteCheck(catalog))
+ }
+ }
+
+ /**
+ * Parser for HiveQl query texts.
+ */
+ override lazy val sqlParser: ParserInterface = new HiveQl(conf)
+
+ /**
+ * Planner that takes into account Hive-specific strategies.
+ */
+ override lazy val planner: SparkPlanner = {
+ new SparkPlanner(ctx) with HiveStrategies {
+ override val hiveContext = ctx
+
+ override def strategies: Seq[Strategy] = {
+ ctx.experimental.extraStrategies ++ Seq(
+ DataSourceStrategy,
+ HiveCommandStrategy(ctx),
+ HiveDDLStrategy,
+ DDLStrategy,
+ SpecialLimits,
+ InMemoryScans,
+ HiveTableScans,
+ DataSinks,
+ Scripts,
+ Aggregation,
+ LeftSemiJoin,
+ EquiJoinSelection,
+ BasicOperators,
+ BroadcastNestedLoop,
+ CartesianProduct,
+ DefaultJoin
+ )
+ }
+ }
+ }
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9d0622bea9..a7eca46d19 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -120,18 +120,25 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
- protected[sql] override lazy val conf: SQLConf = new SQLConf {
- override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-
- clear()
-
- override def clear(): Unit = {
- super.clear()
-
- TestHiveContext.overrideConfs.map {
- case (key, value) => setConfString(key, value)
+ @transient
+ protected[sql] override lazy val sessionState = new HiveSessionState(this) {
+ override lazy val conf: SQLConf = {
+ new SQLConf {
+ clear()
+ override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+ override def clear(): Unit = {
+ super.clear()
+ TestHiveContext.overrideConfs.map {
+ case (key, value) => setConfString(key, value)
+ }
+ }
}
}
+
+ override lazy val functionRegistry = {
+ new TestHiveFunctionRegistry(
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+ }
}
/**
@@ -454,9 +461,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
}
}
- @transient
- override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry(
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
}
private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a127cf6e4b..d81c566822 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
test("Caching converted data source Parquet Relations") {
- def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = {
+ val _catalog = catalog
+ def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
@@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet")
+ var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
// First, make sure the converted test_parquet is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
@@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
- tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+ tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""