aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org/apache
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-18 13:15:23 -0700
committerYin Huai <yhuai@databricks.com>2016-04-18 13:15:23 -0700
commit28ee15702d9efd52a26a065c6e544b5345a8f65d (patch)
treef097fa7ac83c4540e262da71d302aebc4b746daf /sql/hive/src/main/scala/org/apache
parente4ae974294fc61f03b235f82d1618f29cad8feee (diff)
downloadspark-28ee15702d9efd52a26a065c6e544b5345a8f65d.tar.gz
spark-28ee15702d9efd52a26a065c6e544b5345a8f65d.tar.bz2
spark-28ee15702d9efd52a26a065c6e544b5345a8f65d.zip
[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState
## What changes were proposed in this pull request? This patch adds a SharedState that groups state shared across multiple SQLContexts. This is analogous to the SessionState added in SPARK-13526 that groups session-specific state. This cleanup makes the constructors of the contexts simpler and ultimately allows us to remove HiveContext in the near future. ## How was this patch tested? Existing tests. Author: Yin Huai <yhuai@databricks.com> Closes #12463 from yhuai/sharedState.
Diffstat (limited to 'sql/hive/src/main/scala/org/apache')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala51
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala53
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala86
4 files changed, 110 insertions, 95 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 42cda0be16..71ef99a6a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -45,12 +45,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
-import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -63,32 +61,14 @@ import org.apache.spark.util.Utils
* @since 1.0.0
*/
class HiveContext private[hive](
- sc: SparkContext,
- cacheManager: CacheManager,
- listener: SQLListener,
- @transient private[hive] val executionHive: HiveClientImpl,
- @transient private[hive] val metadataHive: HiveClient,
- isRootContext: Boolean,
- @transient private[sql] val hiveCatalog: HiveExternalCatalog)
- extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging {
- self =>
+ @transient protected[hive] val hiveSharedState: HiveSharedState,
+ override val isRootContext: Boolean)
+ extends SQLContext(hiveSharedState, isRootContext) with Logging {
- private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) {
- this(
- sc,
- new CacheManager,
- SQLContext.createListenerAndUI(sc),
- execHive,
- metaHive,
- true,
- new HiveExternalCatalog(metaHive))
- }
+ self =>
def this(sc: SparkContext) = {
- this(
- sc,
- HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
- HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration))
+ this(new HiveSharedState(sc), true)
}
def this(sc: JavaSparkContext) = this(sc.sc)
@@ -103,19 +83,16 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
- new HiveContext(
- sc = sc,
- cacheManager = cacheManager,
- listener = listener,
- executionHive = executionHive.newSession(),
- metadataHive = metadataHive.newSession(),
- isRootContext = false,
- hiveCatalog = hiveCatalog)
+ new HiveContext(hiveSharedState, isRootContext = false)
}
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)
+ protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog
+ protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive
+ protected[hive] def metadataHive: HiveClient = sessionState.metadataHive
+
/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -159,7 +136,7 @@ class HiveContext private[hive](
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
protected[hive] def hiveThriftServerSingleSession: Boolean =
- sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
+ sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false)
@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -527,7 +504,9 @@ private[hive] object HiveContext extends Logging {
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
- private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = {
+ protected[hive] def newClientForMetadata(
+ conf: SparkConf,
+ hadoopConf: Configuration): HiveClient = {
val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf])
val configurations = hiveClientConfigurations(hiveConf)
newClientForMetadata(conf, hiveConf, hadoopConf, configurations)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b992fda18c..bc28b55d06 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,10 +18,11 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.execution.{python, SparkPlanner}
+import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.hive.execution.HiveSqlParser
import org.apache.spark.sql.internal.{SessionState, SQLConf}
@@ -31,6 +32,16 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
+ /**
+ * A Hive client used for execution.
+ */
+ val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession()
+
+ /**
+ * A Hive client used for interacting with the metastore.
+ */
+ val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession()
+
override lazy val conf: SQLConf = new SQLConf {
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
new file mode 100644
index 0000000000..11097c33df
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
+import org.apache.spark.sql.internal.SharedState
+
+
+/**
+ * A class that holds all state shared across sessions in a given [[HiveContext]].
+ */
+private[hive] class HiveSharedState(override val sparkContext: SparkContext)
+ extends SharedState(sparkContext) {
+
+ // TODO: just share the IsolatedClientLoader instead of the client instances themselves
+
+ /**
+ * A Hive client used for execution.
+ */
+ val executionHive: HiveClientImpl = {
+ HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
+ }
+
+ /**
+ * A Hive client used to interact with the metastore.
+ */
+ // This needs to be a lazy val at here because TestHiveSharedState is overriding it.
+ lazy val metadataHive: HiveClient = {
+ HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
+ }
+
+ /**
+ * A catalog that interacts with the Hive metastore.
+ */
+ override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
+
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 7f6ca21782..d56d36fe32 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -72,63 +72,24 @@ object TestHive
* test cases that rely on TestHive must be serialized.
*/
class TestHiveContext private[hive](
- sc: SparkContext,
- cacheManager: CacheManager,
- listener: SQLListener,
- executionHive: HiveClientImpl,
- metadataHive: HiveClient,
- isRootContext: Boolean,
- hiveCatalog: HiveExternalCatalog,
+ testHiveSharedState: TestHiveSharedState,
val warehousePath: File,
val scratchDirPath: File,
- metastoreTemporaryConf: Map[String, String])
- extends HiveContext(
- sc,
- cacheManager,
- listener,
- executionHive,
- metadataHive,
- isRootContext,
- hiveCatalog) { self =>
-
- // Unfortunately, due to the complex interactions between the construction parameters
- // and the limitations in scala constructors, we need many of these constructors to
- // provide a shorthand to create a new TestHiveContext with only a SparkContext.
- // This is not a great design pattern but it's necessary here.
+ metastoreTemporaryConf: Map[String, String],
+ isRootContext: Boolean)
+ extends HiveContext(testHiveSharedState, isRootContext) { self =>
private def this(
sc: SparkContext,
- executionHive: HiveClientImpl,
- metadataHive: HiveClient,
warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String]) {
this(
- sc,
- new CacheManager,
- SQLContext.createListenerAndUI(sc),
- executionHive,
- metadataHive,
- true,
- new HiveExternalCatalog(metadataHive),
+ new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf),
warehousePath,
scratchDirPath,
- metastoreTemporaryConf)
- }
-
- private def this(
- sc: SparkContext,
- warehousePath: File,
- scratchDirPath: File,
- metastoreTemporaryConf: Map[String, String]) {
- this(
- sc,
- HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration),
- TestHiveContext.newClientForMetadata(
- sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf),
- warehousePath,
- scratchDirPath,
- metastoreTemporaryConf)
+ metastoreTemporaryConf,
+ true)
}
def this(sc: SparkContext) {
@@ -141,16 +102,11 @@ class TestHiveContext private[hive](
override def newSession(): HiveContext = {
new TestHiveContext(
- sc = sc,
- cacheManager = cacheManager,
- listener = listener,
- executionHive = executionHive.newSession(),
- metadataHive = metadataHive.newSession(),
- isRootContext = false,
- hiveCatalog = hiveCatalog,
- warehousePath = warehousePath,
- scratchDirPath = scratchDirPath,
- metastoreTemporaryConf = metastoreTemporaryConf)
+ testHiveSharedState,
+ warehousePath,
+ scratchDirPath,
+ metastoreTemporaryConf,
+ isRootContext = false)
}
// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
@@ -549,6 +505,22 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
}
}
+
+private[hive] class TestHiveSharedState(
+ sc: SparkContext,
+ warehousePath: File,
+ scratchDirPath: File,
+ metastoreTemporaryConf: Map[String, String])
+ extends HiveSharedState(sc) {
+
+ override lazy val metadataHive: HiveClient = {
+ TestHiveContext.newClientForMetadata(
+ sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf)
+ }
+
+}
+
+
private[hive] object TestHiveContext {
/**
@@ -563,7 +535,7 @@ private[hive] object TestHiveContext {
/**
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
*/
- private def newClientForMetadata(
+ def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration,
warehousePath: File,