diff options
Diffstat (limited to 'sql/hive')
11 files changed, 35 insertions, 81 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 3cfe93234f..5393c57c9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -52,10 +52,6 @@ class HiveContext private[hive](_sparkSession: SparkSession) sparkSession.sessionState.asInstanceOf[HiveSessionState] } - protected[sql] override def sharedState: HiveSharedState = { - sparkSession.sharedState.asInstanceOf[HiveSharedState] - } - /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index de3e60a44d..2586d11a6c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.thrift.TException +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier @@ -41,13 +42,20 @@ import org.apache.spark.sql.types.{DataType, StructType} * A persistent implementation of the system catalog using Hive. * All public methods must be synchronized for thread-safety. */ -private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration) +private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog with Logging { import CatalogTypes.TablePartitionSpec import HiveExternalCatalog._ import CatalogTableType._ + /** + * A Hive client used to interact with the metastore. + */ + val client: HiveClient = { + HiveUtils.newClientForMetadata(conf, hadoopConf) + } + // Exceptions thrown by the hive client that we would like to wrap private val clientExceptions = Set( classOf[HiveException].getCanonicalName, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 181f470b2a..701b73a4aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -44,7 +44,8 @@ import org.apache.spark.sql.types._ */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive + private val client = + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index e01c053ab5..a7cc7cc142 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -33,21 +33,18 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) self => - private lazy val sharedState: HiveSharedState = { - sparkSession.sharedState.asInstanceOf[HiveSharedState] - } - /** * A Hive client used for interacting with the metastore. */ - lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession() + lazy val metadataHive: HiveClient = + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession() /** * Internal catalog for managing table and database states. */ override lazy val catalog = { new HiveSessionCatalog( - sharedState.externalCatalog, + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], metadataHive, sparkSession, functionResourceLoader, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala deleted file mode 100644 index 12b4962fba..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkContext -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.SharedState - - -/** - * A class that holds all state shared across sessions in a given - * [[org.apache.spark.sql.SparkSession]] backed by Hive. - */ -private[hive] class HiveSharedState(override val sparkContext: SparkContext) - extends SharedState(sparkContext) { - - // TODO: just share the IsolatedClientLoader instead of the client instance itself - - /** - * A Hive client used to interact with the metastore. - */ - // This needs to be a lazy val at here because TestHiveSharedState is overriding it. - lazy val metadataHive: HiveClient = { - HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration) - } - - /** - * A catalog that interacts with the Hive metastore. - */ - override lazy val externalCatalog = - new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration) -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index cdc8d610d3..163f210802 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SharedState, SQLConf} import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. @@ -108,13 +108,13 @@ class TestHiveContext( * A [[SparkSession]] used in [[TestHiveContext]]. * * @param sc SparkContext - * @param existingSharedState optional [[HiveSharedState]] + * @param existingSharedState optional [[SharedState]] * @param loadTestTables if true, load the test tables. They can only be loaded when running * in the JVM, i.e when calling from Python this flag has to be false. */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, - @transient private val existingSharedState: Option[HiveSharedState], + @transient private val existingSharedState: Option[SharedState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => @@ -139,14 +139,13 @@ private[hive] class TestHiveSparkSession( assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") - // TODO: Let's remove HiveSharedState and TestHiveSessionState. Otherwise, - // we are not really testing the reflection logic based on the setting of - // CATALOG_IMPLEMENTATION. @transient - override lazy val sharedState: HiveSharedState = { - existingSharedState.getOrElse(new HiveSharedState(sc)) + override lazy val sharedState: SharedState = { + existingSharedState.getOrElse(new SharedState(sc)) } + // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection + // logic based on the setting of CATALOG_IMPLEMENTATION. @transient override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index 23798431e6..96e9054cd4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -31,7 +31,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton { } test("SPARK-15887: hive-site.xml should be loaded") { - val hiveClient = spark.sharedState.asInstanceOf[HiveSharedState].metadataHive + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client assert(hiveClient.getConf("hive.in.test", "") == "true") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 175889b08b..26c2549820 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -21,26 +21,26 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.hive.client.HiveClient /** * Test suite for the [[HiveExternalCatalog]]. */ class HiveExternalCatalogSuite extends ExternalCatalogSuite { - private val client: HiveClient = { - // We create a metastore at a temp location to avoid any potential - // conflict of having multiple connections to a single derby instance. - HiveUtils.newClientForExecution(new SparkConf, new Configuration) + private val externalCatalog: HiveExternalCatalog = { + val catalog = new HiveExternalCatalog(new SparkConf, new Configuration) + catalog.client.reset() + catalog } protected override val utils: CatalogTestUtils = new CatalogTestUtils { override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" - override def newEmptyCatalog(): ExternalCatalog = - new HiveExternalCatalog(client, new Configuration()) + override def newEmptyCatalog(): ExternalCatalog = externalCatalog } - protected override def resetState(): Unit = client.reset() + protected override def resetState(): Unit = { + externalCatalog.client.reset() + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index dd8fec0c15..af28286666 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -378,10 +378,9 @@ object SetMetastoreURLTest extends Logging { s"spark.sql.test.expectedMetastoreURL should be set.") } - // HiveSharedState is used when Hive support is enabled. + // HiveExternalCatalog is used when Hive support is enabled. val actualMetastoreURL = - spark.sharedState.asInstanceOf[HiveSharedState] - .metadataHive + spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client .getConf("javax.jdo.option.ConnectionURL", "this_is_a_wrong_URL") logInfo(s"javax.jdo.option.ConnectionURL is $actualMetastoreURL") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 571ba49d11..d77bb5cf95 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -51,7 +51,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition // columns and bucket specification are still in table properties) from hive client. - private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive + private def hiveClient: HiveClient = + sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client test("persistent JSON table") { withTable("jsonTable") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 68f1bb60f6..e925921165 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -266,7 +266,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing } private def createRawHiveTable(ddl: String): Unit = { - hiveContext.sharedState.asInstanceOf[HiveSharedState].metadataHive.runSqlHive(ddl) + hiveContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(ddl) } private def checkCreateTable(table: String): Unit = { |