From 142f6d14928c780cc9e8d6d7749c5d7c08a30972 Mon Sep 17 00:00:00 2001 From: Kunal Khamar Date: Wed, 29 Mar 2017 12:35:19 -0700 Subject: [SPARK-20048][SQL] Cloning SessionState does not clone query execution listeners ## What changes were proposed in this pull request? Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826) Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries. ## How was this patch tested? - Unit test Author: Kunal Khamar Closes #17379 from kunalkhamar/clone-bugfix. --- .../apache/spark/sql/hive/HiveSessionState.scala | 141 --------------------- .../spark/sql/hive/HiveSessionStateBuilder.scala | 129 +++++++++++++++++++ .../org/apache/spark/sql/hive/test/TestHive.scala | 2 +- .../spark/sql/hive/HiveSessionStateSuite.scala | 2 +- 4 files changed, 131 insertions(+), 143 deletions(-) delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala deleted file mode 100644 index f49e6bb418..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.annotation.{Experimental, InterfaceStability} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlanner -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} - -/** - * Entry object for creating a Hive aware [[SessionState]]. - */ -private[hive] object HiveSessionState { - /** - * Create a new Hive aware [[SessionState]]. for the given session. - */ - def apply(session: SparkSession): SessionState = { - new HiveSessionStateBuilder(session).build() - } -} - -/** - * Builder that produces a [[HiveSessionState]]. - */ -@Experimental -@InterfaceStability.Unstable -class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) - extends BaseSessionStateBuilder(session, parentState) { - - private def externalCatalog: HiveExternalCatalog = - session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - - /** - * Create a Hive aware resource loader. - */ - override protected lazy val resourceLoader: HiveSessionResourceLoader = { - val client: HiveClient = externalCatalog.client.newSession() - new HiveSessionResourceLoader(session, client) - } - - /** - * Create a [[HiveSessionCatalog]]. - */ - override protected lazy val catalog: HiveSessionCatalog = { - val catalog = new HiveSessionCatalog( - externalCatalog, - session.sharedState.globalTempViewManager, - new HiveMetastoreCatalog(session), - functionRegistry, - conf, - SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), - sqlParser, - resourceLoader) - parentState.foreach(_.catalog.copyStateTo(catalog)) - catalog - } - - /** - * A logical query plan `Analyzer` with rules specific to Hive. - */ - override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(session) +: - new FindDataSourceTable(session) +: - new ResolveSQLOnFile(session) +: - customResolutionRules - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - new DetermineTableStats(session) +: - catalog.ParquetConversions +: - catalog.OrcConversions +: - PreprocessTableCreation(session) +: - PreprocessTableInsertion(conf) +: - DataSourceAnalysis(conf) +: - HiveAnalysis +: - customPostHocResolutionRules - - override val extendedCheckRules: Seq[LogicalPlan => Unit] = - PreWriteCheck +: - customCheckRules - } - - /** - * Planner that takes into account Hive-specific strategies. - */ - override protected def planner: SparkPlanner = { - new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { - override val sparkSession: SparkSession = session - - override def extraPlanningStrategies: Seq[Strategy] = - super.extraPlanningStrategies ++ customPlanningStrategies - - override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ - extraPlanningStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) - } - } - } - - override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) -} - -class HiveSessionResourceLoader( - session: SparkSession, - client: HiveClient) - extends SessionResourceLoader(session) { - override def addJar(path: String): Unit = { - client.addJar(path) - super.addJar(path) - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala new file mode 100644 index 0000000000..8048c2ba2c --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} + +/** + * Builder that produces a Hive aware [[SessionState]]. + */ +@Experimental +@InterfaceStability.Unstable +class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None) + extends BaseSessionStateBuilder(session, parentState) { + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + /** + * Create a [[HiveSessionCatalog]]. + */ + override protected lazy val catalog: HiveSessionCatalog = { + val catalog = new HiveSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + new HiveMetastoreCatalog(session), + functionRegistry, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + /** + * A logical query plan `Analyzer` with rules specific to Hive. + */ + override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + customResolutionRules + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + catalog.ParquetConversions +: + catalog.OrcConversions +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck +: + customCheckRules + } + + /** + * Planner that takes into account Hive-specific strategies. + */ + override protected def planner: SparkPlanner = { + new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies { + override val sparkSession: SparkSession = session + + override def extraPlanningStrategies: Seq[Strategy] = + super.extraPlanningStrategies ++ customPlanningStrategies + + override def strategies: Seq[Strategy] = { + experimentalMethods.extraStrategies ++ + extraPlanningStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, + SpecialLimits, + InMemoryScans, + HiveTableScans, + Scripts, + Aggregation, + JoinSelection, + BasicOperators + ) + } + } + } + + override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _) +} + +class HiveSessionResourceLoader( + session: SparkSession, + client: HiveClient) + extends SessionResourceLoader(session) { + override def addJar(path: String): Unit = { + client.addJar(path) + super.addJar(path) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 0bcf219922..d9bb1f8c7e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal._ +import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, WithTestConf} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala index 67c77fb62f..958ad3e1c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHiveSingleton /** - * Run all tests from `SessionStateSuite` with a `HiveSessionState`. + * Run all tests from `SessionStateSuite` with a Hive based `SessionState`. */ class HiveSessionStateSuite extends SessionStateSuite with TestHiveSingleton with BeforeAndAfterEach { -- cgit v1.2.3