aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-08-25 12:50:03 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-25 12:50:03 +0800
commit4d0706d616176dc29ff3562e40cb00dd4eb9c302 (patch)
tree19654fb381ddd754631268affdc9a4575b2ba4e6 /sql/hive/src
parentac27557eb622a257abeb3e8551f06ebc72f87133 (diff)
downloadspark-4d0706d616176dc29ff3562e40cb00dd4eb9c302.tar.gz
spark-4d0706d616176dc29ff3562e40cb00dd4eb9c302.tar.bz2
spark-4d0706d616176dc29ff3562e40cb00dd4eb9c302.zip
[SPARK-17190][SQL] Removal of HiveSharedState
### What changes were proposed in this pull request? Since `HiveClient` is used to interact with the Hive metastore, it should be hidden in `HiveExternalCatalog`. After moving `HiveClient` into `HiveExternalCatalog`, `HiveSharedState` becomes a wrapper of `HiveExternalCatalog`. Thus, removal of `HiveSharedState` becomes straightforward. After removal of `HiveSharedState`, the reflection logic is directly applied on the choice of `ExternalCatalog` types, based on the configuration of `CATALOG_IMPLEMENTATION`. ~~`HiveClient` is also used/invoked by the other entities besides HiveExternalCatalog, we defines the following two APIs: getClient and getNewClient~~ ### How was this patch tested? The existing test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14757 from gatorsmile/removeHiveClient.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala2
11 files changed, 35 insertions, 81 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 3cfe93234f..5393c57c9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -52,10 +52,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
sparkSession.sessionState.asInstanceOf[HiveSessionState]
}
- protected[sql] override def sharedState: HiveSharedState = {
- sparkSession.sharedState.asInstanceOf[HiveSharedState]
- }
-
/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index de3e60a44d..2586d11a6c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException
+import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -41,13 +42,20 @@ import org.apache.spark.sql.types.{DataType, StructType}
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
*/
-private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration)
+private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends ExternalCatalog with Logging {
import CatalogTypes.TablePartitionSpec
import HiveExternalCatalog._
import CatalogTableType._
+ /**
+ * A Hive client used to interact with the metastore.
+ */
+ val client: HiveClient = {
+ HiveUtils.newClientForMetadata(conf, hadoopConf)
+ }
+
// Exceptions thrown by the hive client that we would like to wrap
private val clientExceptions = Set(
classOf[HiveException].getCanonicalName,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 181f470b2a..701b73a4aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -44,7 +44,8 @@ import org.apache.spark.sql.types._
*/
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
- private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
+ private val client =
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index e01c053ab5..a7cc7cc142 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -33,21 +33,18 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
self =>
- private lazy val sharedState: HiveSharedState = {
- sparkSession.sharedState.asInstanceOf[HiveSharedState]
- }
-
/**
* A Hive client used for interacting with the metastore.
*/
- lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession()
+ lazy val metadataHive: HiveClient =
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
/**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
new HiveSessionCatalog(
- sharedState.externalCatalog,
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
metadataHive,
sparkSession,
functionResourceLoader,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
deleted file mode 100644
index 12b4962fba..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SharedState
-
-
-/**
- * A class that holds all state shared across sessions in a given
- * [[org.apache.spark.sql.SparkSession]] backed by Hive.
- */
-private[hive] class HiveSharedState(override val sparkContext: SparkContext)
- extends SharedState(sparkContext) {
-
- // TODO: just share the IsolatedClientLoader instead of the client instance itself
-
- /**
- * A Hive client used to interact with the metastore.
- */
- // This needs to be a lazy val at here because TestHiveSharedState is overriding it.
- lazy val metadataHive: HiveClient = {
- HiveUtils.newClientForMetadata(sparkContext.conf, sparkContext.hadoopConfiguration)
- }
-
- /**
- * A catalog that interacts with the Hive metastore.
- */
- override lazy val externalCatalog =
- new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index cdc8d610d3..163f210802 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SharedState, SQLConf}
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
@@ -108,13 +108,13 @@ class TestHiveContext(
* A [[SparkSession]] used in [[TestHiveContext]].
*
* @param sc SparkContext
- * @param existingSharedState optional [[HiveSharedState]]
+ * @param existingSharedState optional [[SharedState]]
* @param loadTestTables if true, load the test tables. They can only be loaded when running
* in the JVM, i.e when calling from Python this flag has to be false.
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
- @transient private val existingSharedState: Option[HiveSharedState],
+ @transient private val existingSharedState: Option[SharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@@ -139,14 +139,13 @@ private[hive] class TestHiveSparkSession(
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
- // TODO: Let's remove HiveSharedState and TestHiveSessionState. Otherwise,
- // we are not really testing the reflection logic based on the setting of
- // CATALOG_IMPLEMENTATION.
@transient
- override lazy val sharedState: HiveSharedState = {
- existingSharedState.getOrElse(new HiveSharedState(sc))
+ override lazy val sharedState: SharedState = {
+ existingSharedState.getOrElse(new SharedState(sc))
}
+ // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really testing the reflection
+ // logic based on the setting of CATALOG_IMPLEMENTATION.
@transient
override lazy val sessionState: TestHiveSessionState =
new TestHiveSessionState(self)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
index 23798431e6..96e9054cd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
@@ -31,7 +31,7 @@ class HiveDataFrameSuite extends QueryTest with TestHiveSingleton {
}
test("SPARK-15887: hive-site.xml should be loaded") {
- val hiveClient = spark.sharedState.asInstanceOf[HiveSharedState].metadataHive
+ val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
assert(hiveClient.getConf("hive.in.test", "") == "true")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 175889b08b..26c2549820 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -21,26 +21,26 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.hive.client.HiveClient
/**
* Test suite for the [[HiveExternalCatalog]].
*/
class HiveExternalCatalogSuite extends ExternalCatalogSuite {
- private val client: HiveClient = {
- // We create a metastore at a temp location to avoid any potential
- // conflict of having multiple connections to a single derby instance.
- HiveUtils.newClientForExecution(new SparkConf, new Configuration)
+ private val externalCatalog: HiveExternalCatalog = {
+ val catalog = new HiveExternalCatalog(new SparkConf, new Configuration)
+ catalog.client.reset()
+ catalog
}
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
- override def newEmptyCatalog(): ExternalCatalog =
- new HiveExternalCatalog(client, new Configuration())
+ override def newEmptyCatalog(): ExternalCatalog = externalCatalog
}
- protected override def resetState(): Unit = client.reset()
+ protected override def resetState(): Unit = {
+ externalCatalog.client.reset()
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index dd8fec0c15..af28286666 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -378,10 +378,9 @@ object SetMetastoreURLTest extends Logging {
s"spark.sql.test.expectedMetastoreURL should be set.")
}
- // HiveSharedState is used when Hive support is enabled.
+ // HiveExternalCatalog is used when Hive support is enabled.
val actualMetastoreURL =
- spark.sharedState.asInstanceOf[HiveSharedState]
- .metadataHive
+ spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
.getConf("javax.jdo.option.ConnectionURL", "this_is_a_wrong_URL")
logInfo(s"javax.jdo.option.ConnectionURL is $actualMetastoreURL")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 571ba49d11..d77bb5cf95 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -51,7 +51,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// To test `HiveExternalCatalog`, we need to read the raw table metadata(schema, partition
// columns and bucket specification are still in table properties) from hive client.
- private def hiveClient: HiveClient = sharedState.asInstanceOf[HiveSharedState].metadataHive
+ private def hiveClient: HiveClient =
+ sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
test("persistent JSON table") {
withTable("jsonTable") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 68f1bb60f6..e925921165 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -266,7 +266,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
}
private def createRawHiveTable(ddl: String): Unit = {
- hiveContext.sharedState.asInstanceOf[HiveSharedState].metadataHive.runSqlHive(ddl)
+ hiveContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(ddl)
}
private def checkCreateTable(table: String): Unit = {