From 7de06a646dff7ede520d2e982ac0996d8c184650 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 17 Apr 2016 17:35:41 -0700 Subject: Revert "[SPARK-14647][SQL] Group SQLContext/HiveContext state into SharedState" This reverts commit 5cefecc95a5b8418713516802c416cfde5a94a2d. --- .../scala/org/apache/spark/sql/SQLContext.scala | 31 ++++---- .../apache/spark/sql/internal/SessionState.scala | 2 + .../apache/spark/sql/internal/SharedState.scala | 47 ------------ .../org/apache/spark/sql/hive/HiveContext.scala | 51 +++++++++---- .../apache/spark/sql/hive/HiveSessionState.scala | 15 +--- .../apache/spark/sql/hive/HiveSharedState.scala | 53 ------------- .../org/apache/spark/sql/hive/test/TestHive.scala | 86 ++++++++++++++-------- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 12 ++- 8 files changed, 122 insertions(+), 175 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 781d699819..9259ff4062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -63,14 +63,17 @@ import org.apache.spark.util.Utils * @since 1.0.0 */ class SQLContext private[sql]( - @transient protected[sql] val sharedState: SharedState, - val isRootContext: Boolean) + @transient val sparkContext: SparkContext, + @transient protected[sql] val cacheManager: CacheManager, + @transient private[sql] val listener: SQLListener, + val isRootContext: Boolean, + @transient private[sql] val externalCatalog: ExternalCatalog) extends Logging with Serializable { self => def this(sc: SparkContext) = { - this(new SharedState(sc), true) + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -97,20 +100,20 @@ class SQLContext private[sql]( } } - def sparkContext: SparkContext = sharedState.sparkContext - - protected[sql] def cacheManager: CacheManager = sharedState.cacheManager - protected[sql] def listener: SQLListener = sharedState.listener - protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog - /** - * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary - * tables, registered functions, but sharing the same [[SparkContext]], cached data and - * other things. + * Returns a SQLContext as new session, with separated SQL configurations, temporary tables, + * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab. * * @since 1.6.0 */ - def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false) + def newSession(): SQLContext = { + new SQLContext( + sparkContext = sparkContext, + cacheManager = cacheManager, + listener = listener, + isRootContext = false, + externalCatalog = externalCatalog) + } /** * Per-session state, e.g. configuration, functions, temporary tables etc. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index d404a7c0ae..c30f879ded 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -22,8 +22,10 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.util.ExecutionListenerManager /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala deleted file mode 100644 index 9a30c7de1f..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.internal - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} -import org.apache.spark.sql.execution.CacheManager -import org.apache.spark.sql.execution.ui.SQLListener - - -/** - * A class that holds all state shared across sessions in a given [[SQLContext]]. - */ -private[sql] class SharedState(val sparkContext: SparkContext) { - - /** - * Class for caching query results reused in future executions. - */ - val cacheManager: CacheManager = new CacheManager - - /** - * A listener for SQL-specific [[org.apache.spark.scheduler.SparkListenerEvent]]s. - */ - val listener: SQLListener = SQLContext.createListenerAndUI(sparkContext) - - /** - * A catalog that interacts with external systems. - */ - lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 71ef99a6a9..42cda0be16 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -45,10 +45,12 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} +import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{AnalyzeTable, DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.internal.{SharedState, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -61,14 +63,32 @@ import org.apache.spark.util.Utils * @since 1.0.0 */ class HiveContext private[hive]( - @transient protected[hive] val hiveSharedState: HiveSharedState, - override val isRootContext: Boolean) - extends SQLContext(hiveSharedState, isRootContext) with Logging { - + sc: SparkContext, + cacheManager: CacheManager, + listener: SQLListener, + @transient private[hive] val executionHive: HiveClientImpl, + @transient private[hive] val metadataHive: HiveClient, + isRootContext: Boolean, + @transient private[sql] val hiveCatalog: HiveExternalCatalog) + extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging { self => + private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) { + this( + sc, + new CacheManager, + SQLContext.createListenerAndUI(sc), + execHive, + metaHive, + true, + new HiveExternalCatalog(metaHive)) + } + def this(sc: SparkContext) = { - this(new HiveSharedState(sc), true) + this( + sc, + HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), + HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration)) } def this(sc: JavaSparkContext) = this(sc.sc) @@ -83,16 +103,19 @@ class HiveContext private[hive]( * and Hive client (both of execution and metadata) with existing HiveContext. */ override def newSession(): HiveContext = { - new HiveContext(hiveSharedState, isRootContext = false) + new HiveContext( + sc = sc, + cacheManager = cacheManager, + listener = listener, + executionHive = executionHive.newSession(), + metadataHive = metadataHive.newSession(), + isRootContext = false, + hiveCatalog = hiveCatalog) } @transient protected[sql] override lazy val sessionState = new HiveSessionState(self) - protected[hive] def hiveCatalog: HiveExternalCatalog = hiveSharedState.externalCatalog - protected[hive] def executionHive: HiveClientImpl = sessionState.executionHive - protected[hive] def metadataHive: HiveClient = sessionState.metadataHive - /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -136,7 +159,7 @@ class HiveContext private[hive]( protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC) protected[hive] def hiveThriftServerSingleSession: Boolean = - sparkContext.conf.getBoolean("spark.sql.hive.thriftServer.singleSession", defaultValue = false) + sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean @transient protected[sql] lazy val substitutor = new VariableSubstitution() @@ -504,9 +527,7 @@ private[hive] object HiveContext extends Logging { * The version of the Hive client that is used here must match the metastore that is configured * in the hive-site.xml file. */ - protected[hive] def newClientForMetadata( - conf: SparkConf, - hadoopConf: Configuration): HiveClient = { + private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = { val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) val configurations = hiveClientConfigurations(hiveConf) newClientForMetadata(conf, hiveConf, hadoopConf, configurations) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index bc28b55d06..b992fda18c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} import org.apache.spark.sql.hive.execution.HiveSqlParser import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -32,16 +31,6 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} */ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) { - /** - * A Hive client used for execution. - */ - val executionHive: HiveClientImpl = ctx.hiveSharedState.executionHive.newSession() - - /** - * A Hive client used for interacting with the metastore. - */ - val metadataHive: HiveClient = ctx.hiveSharedState.metadataHive.newSession() - override lazy val conf: SQLConf = new SQLConf { override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala deleted file mode 100644 index 11097c33df..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkContext -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} -import org.apache.spark.sql.internal.SharedState - - -/** - * A class that holds all state shared across sessions in a given [[HiveContext]]. - */ -private[hive] class HiveSharedState(override val sparkContext: SparkContext) - extends SharedState(sparkContext) { - - // TODO: just share the IsolatedClientLoader instead of the client instances themselves - - /** - * A Hive client used for execution. - */ - val executionHive: HiveClientImpl = { - HiveContext.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) - } - - /** - * A Hive client used to interact with the metastore. - */ - // This needs to be a lazy val at here because TestHiveSharedState is overriding it. - lazy val metadataHive: HiveClient = { - HiveContext.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration) - } - - /** - * A catalog that interacts with the Hive metastore. - */ - override lazy val externalCatalog = new HiveExternalCatalog(metadataHive) - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index d56d36fe32..7f6ca21782 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -72,24 +72,63 @@ object TestHive * test cases that rely on TestHive must be serialized. */ class TestHiveContext private[hive]( - testHiveSharedState: TestHiveSharedState, + sc: SparkContext, + cacheManager: CacheManager, + listener: SQLListener, + executionHive: HiveClientImpl, + metadataHive: HiveClient, + isRootContext: Boolean, + hiveCatalog: HiveExternalCatalog, val warehousePath: File, val scratchDirPath: File, - metastoreTemporaryConf: Map[String, String], - isRootContext: Boolean) - extends HiveContext(testHiveSharedState, isRootContext) { self => + metastoreTemporaryConf: Map[String, String]) + extends HiveContext( + sc, + cacheManager, + listener, + executionHive, + metadataHive, + isRootContext, + hiveCatalog) { self => + + // Unfortunately, due to the complex interactions between the construction parameters + // and the limitations in scala constructors, we need many of these constructors to + // provide a shorthand to create a new TestHiveContext with only a SparkContext. + // This is not a great design pattern but it's necessary here. private def this( sc: SparkContext, + executionHive: HiveClientImpl, + metadataHive: HiveClient, warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String]) { this( - new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf), + sc, + new CacheManager, + SQLContext.createListenerAndUI(sc), + executionHive, + metadataHive, + true, + new HiveExternalCatalog(metadataHive), warehousePath, scratchDirPath, - metastoreTemporaryConf, - true) + metastoreTemporaryConf) + } + + private def this( + sc: SparkContext, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]) { + this( + sc, + HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), + TestHiveContext.newClientForMetadata( + sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf), + warehousePath, + scratchDirPath, + metastoreTemporaryConf) } def this(sc: SparkContext) { @@ -102,11 +141,16 @@ class TestHiveContext private[hive]( override def newSession(): HiveContext = { new TestHiveContext( - testHiveSharedState, - warehousePath, - scratchDirPath, - metastoreTemporaryConf, - isRootContext = false) + sc = sc, + cacheManager = cacheManager, + listener = listener, + executionHive = executionHive.newSession(), + metadataHive = metadataHive.newSession(), + isRootContext = false, + hiveCatalog = hiveCatalog, + warehousePath = warehousePath, + scratchDirPath = scratchDirPath, + metastoreTemporaryConf = metastoreTemporaryConf) } // By clearing the port we force Spark to pick a new one. This allows us to rerun tests @@ -505,22 +549,6 @@ private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry { } } - -private[hive] class TestHiveSharedState( - sc: SparkContext, - warehousePath: File, - scratchDirPath: File, - metastoreTemporaryConf: Map[String, String]) - extends HiveSharedState(sc) { - - override lazy val metadataHive: HiveClient = { - TestHiveContext.newClientForMetadata( - sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf) - } - -} - - private[hive] object TestHiveContext { /** @@ -535,7 +563,7 @@ private[hive] object TestHiveContext { /** * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. */ - def newClientForMetadata( + private def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, warehousePath: File, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 84285b7f40..3334c16f0b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} +import org.apache.spark.util.Utils /** * Test suite for the [[HiveExternalCatalog]]. @@ -29,9 +31,11 @@ import org.apache.spark.sql.hive.client.HiveClient class HiveExternalCatalogSuite extends CatalogTestCases { private val client: HiveClient = { - // We create a metastore at a temp location to avoid any potential - // conflict of having multiple connections to a single derby instance. - HiveContext.newClientForExecution(new SparkConf, new Configuration) + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = new SparkConf(), + hadoopConf = new Configuration()).createClient() } protected override val utils: CatalogTestUtils = new CatalogTestUtils { -- cgit v1.2.3