From 5cefecc95a5b8418713516802c416cfde5a94a2d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 16 Apr 2016 14:00:53 -0700 Subject: [SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState ## What changes were proposed in this pull request? This patch adds a SharedState that groups state shared across multiple SQLContexts. This is analogous to the SessionState added in SPARK-13526 that groups session-specific state. This cleanup makes the constructors of the contexts simpler and ultimately allows us to remove HiveContext in the near future. ## How was this patch tested? Existing tests. Closes #12405 Author: Andrew Or Author: Yin Huai Closes #12447 from yhuai/sharedState. --- .../org/apache/spark/sql/hive/HiveContext.scala | 51 ++++--------- .../apache/spark/sql/hive/HiveSessionState.scala | 15 +++- .../apache/spark/sql/hive/HiveSharedState.scala | 53 +++++++++++++ .../org/apache/spark/sql/hive/test/TestHive.scala | 86 ++++++++-------------- 4 files changed, 110 insertions(+), 95 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala (limited to 'sql/hive/src/main') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7ebdad1a68..e366743118 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -49,12 +49,10 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} -import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SharedState, SQLConf} import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -67,32 +65,14 @@ import org.apache.spark.util.Utils * @since 1.0.0 */ class HiveContext private[hive]( - sc: SparkContext, - cacheManager: CacheManager, - listener: SQLListener, - @transient private[hive] val executionHive: HiveClientImpl, - @transient private[hive] val metadataHive: HiveClient, - isRootContext: Boolean, - @transient private[sql] val hiveCatalog: HiveExternalCatalog) - extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging { - self => + @transient protected[hive] val hiveSharedState: HiveSharedState, + override val isRootContext: Boolean) + extends SQLContext(hiveSharedState, isRootContext) with Logging { - private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) { - this( - sc, - new CacheManager, - SQLContext.createListenerAndUI(sc), - execHive, - metaHive, - true, - new HiveExternalCatalog(metaHive)) - } + self => def this(sc: SparkContext) = { - this( - sc, - HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), - HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration)) + this(new HiveSharedState(sc), true) } def this(sc: JavaSparkContext) = this(sc.sc) @@ -107,19 +87,16 @@ class HiveContext private[hive]( * and Hive client (both of execution and metadata) with existing HiveContext. */ override def newSession(): HiveContext = { - new HiveContext( - sc = sc, - cacheManager = cacheManager, - listener = listener, - executionHive = executionHive.newSession(), - metadataHive = metadataHive.newSession(), - isRootContext = false, - hiveCatalog = hiveCatalog) + new HiveContext(hiveSharedState, isRootContext = false) } @transient protected[sql] override lazy val sessionState = new HiveSessionState(self) + protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog + protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive + protected[hive] def metadataHive: HiveClient = sessionState.metadataHive + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -163,7 +140,7 @@ class HiveContext private[hive]( protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC) protected[hive] def hiveThriftServerSingleSession: Boolean = - sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean + sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false) @transient protected[sql] lazy val substitutor = new VariableSubstitution() @@ -601,7 +578,9 @@ private[hive] object HiveContext extends Logging { * The version of the Hive client that is used here must match the metastore that is configured * in the hive-site.xml file. */ - private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = { + protected[hive] def newClientForMetadata( + conf: SparkConf, + hadoopConf: Configuration): HiveClient = { val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) val configurations = hiveClientConfigurations(hiveConf) newClientForMetadata(conf, hiveConf, hadoopConf, configurations) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index b992fda18c..bc28b55d06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.execution.{python, SparkPlanner} +import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} import org.apache.spark.sql.hive.execution.HiveSqlParser import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -31,6 +32,16 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} */ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { + /** + * A Hive client used for execution. + */ + val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession() + + /** + * A Hive client used for interacting with the metastore. + */ + val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession() + override lazy val conf: SQLConf = new SQLConf { override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala new file mode 100644 index 0000000000..11097c33df --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.internal.SharedState + + +/** + * A class that holds all state shared across sessions in a given [[HiveContext]]. + */ +private[hive] class HiveSharedState(override val sparkContext: SparkContext) + extends SharedState(sparkContext) { + + // TODO: just share the IsolatedClientLoader instead of the client instances themselves + + /** + * A Hive client used for execution. + */ + val executionHive: HiveClientImpl = { + HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) + } + + /** + * A Hive client used to interact with the metastore. + */ + // This needs to be a lazy val at here because TestHiveSharedState is overriding it. + lazy val metadataHive: HiveClient = { + HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration) + } + + /** + * A catalog that interacts with the Hive metastore. + */ + override lazy val externalCatalog = new HiveExternalCatalog(metadataHive) + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7f6ca21782..d56d36fe32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -72,63 +72,24 @@ object TestHive * test cases that rely on TestHive must be serialized. */ class TestHiveContext private[hive]( - sc: SparkContext, - cacheManager: CacheManager, - listener: SQLListener, - executionHive: HiveClientImpl, - metadataHive: HiveClient, - isRootContext: Boolean, - hiveCatalog: HiveExternalCatalog, + testHiveSharedState: TestHiveSharedState, val warehousePath: File, val scratchDirPath: File, - metastoreTemporaryConf: Map[String, String]) - extends HiveContext( - sc, - cacheManager, - listener, - executionHive, - metadataHive, - isRootContext, - hiveCatalog) { self => - - // Unfortunately, due to the complex interactions between the construction parameters - // and the limitations in scala constructors, we need many of these constructors to - // provide a shorthand to create a new TestHiveContext with only a SparkContext. - // This is not a great design pattern but it's necessary here. + metastoreTemporaryConf: Map[String, String], + isRootContext: Boolean) + extends HiveContext(testHiveSharedState, isRootContext) { self => private def this( sc: SparkContext, - executionHive: HiveClientImpl, - metadataHive: HiveClient, warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]) { this( - sc, - new CacheManager, - SQLContext.createListenerAndUI(sc), - executionHive, - metadataHive, - true, - new HiveExternalCatalog(metadataHive), + new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf), warehousePath, scratchDirPath, - metastoreTemporaryConf) - } - - private def this( - sc: SparkContext, - warehousePath: File, - scratchDirPath: File, - metastoreTemporaryConf: Map[String, String]) { - this( - sc, - HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), - TestHiveContext.newClientForMetadata( - sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf), - warehousePath, - scratchDirPath, - metastoreTemporaryConf) + metastoreTemporaryConf, + true) } def this(sc: SparkContext) { @@ -141,16 +102,11 @@ class TestHiveContext private[hive]( override def newSession(): HiveContext = { new TestHiveContext( - sc = sc, - cacheManager = cacheManager, - listener = listener, - executionHive = executionHive.newSession(), - metadataHive = metadataHive.newSession(), - isRootContext = false, - hiveCatalog = hiveCatalog, - warehousePath = warehousePath, - scratchDirPath = scratchDirPath, - metastoreTemporaryConf = metastoreTemporaryConf) + testHiveSharedState, + warehousePath, + scratchDirPath, + metastoreTemporaryConf, + isRootContext = false) } // By clearing the port we force Spark to pick a new one. This allows us to rerun tests @@ -549,6 +505,22 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry { } } + +private[hive] class TestHiveSharedState( + sc: SparkContext, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]) + extends HiveSharedState(sc) { + + override lazy val metadataHive: HiveClient = { + TestHiveContext.newClientForMetadata( + sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf) + } + +} + + private[hive] object TestHiveContext { /** @@ -563,7 +535,7 @@ private[hive] object TestHiveContext { /** * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. */ - private def newClientForMetadata( + def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, warehousePath: File, -- cgit v1.2.3