aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala285
1 files changed, 285 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
new file mode 100644
index 0000000000..f627384253
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.thrift.TException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.client.HiveClient
+
+
+/**
+ * A persistent implementation of the system catalog using Hive.
+ * All public methods must be synchronized for thread-safety.
+ */
+private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging {
+ import ExternalCatalog._
+
+ // Exceptions thrown by the hive client that we would like to wrap
+ private val clientExceptions = Set(
+ classOf[HiveException].getCanonicalName,
+ classOf[TException].getCanonicalName)
+
+ /**
+ * Whether this is an exception thrown by the hive client that should be wrapped.
+ *
+ * Due to classloader isolation issues, pattern matching won't work here so we need
+ * to compare the canonical names of the exceptions, which we assume to be stable.
+ */
+ private def isClientException(e: Throwable): Boolean = {
+ var temp: Class[_] = e.getClass
+ var found = false
+ while (temp != null && !found) {
+ found = clientExceptions.contains(temp.getCanonicalName)
+ temp = temp.getSuperclass
+ }
+ found
+ }
+
+ /**
+ * Run some code involving `client` in a [[synchronized]] block and wrap certain
+ * exceptions thrown in the process in [[AnalysisException]].
+ */
+ private def withClient[T](body: => T): T = synchronized {
+ try {
+ body
+ } catch {
+ case NonFatal(e) if isClientException(e) =>
+ throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+ }
+ }
+
+ private def requireDbMatches(db: String, table: CatalogTable): Unit = {
+ if (table.identifier.database != Some(db)) {
+ throw new AnalysisException(
+ s"Provided database $db does not match the one specified in the " +
+ s"table definition (${table.identifier.database.getOrElse("n/a")})")
+ }
+ }
+
+ private def requireTableExists(db: String, table: String): Unit = {
+ withClient { getTable(db, table) }
+ }
+
+ // --------------------------------------------------------------------------
+ // Databases
+ // --------------------------------------------------------------------------
+
+ override def createDatabase(
+ dbDefinition: CatalogDatabase,
+ ignoreIfExists: Boolean): Unit = withClient {
+ client.createDatabase(dbDefinition, ignoreIfExists)
+ }
+
+ override def dropDatabase(
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = withClient {
+ client.dropDatabase(db, ignoreIfNotExists, cascade)
+ }
+
+ /**
+ * Alter a database whose name matches the one specified in `dbDefinition`,
+ * assuming the database exists.
+ *
+ * Note: As of now, this only supports altering database properties!
+ */
+ override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+ val existingDb = getDatabase(dbDefinition.name)
+ if (existingDb.properties == dbDefinition.properties) {
+ logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
+ s"the provided database properties are the same as the old ones. Hive does not " +
+ s"currently support altering other database fields.")
+ }
+ client.alterDatabase(dbDefinition)
+ }
+
+ override def getDatabase(db: String): CatalogDatabase = withClient {
+ client.getDatabase(db)
+ }
+
+ override def databaseExists(db: String): Boolean = withClient {
+ client.getDatabaseOption(db).isDefined
+ }
+
+ override def listDatabases(): Seq[String] = withClient {
+ client.listDatabases("*")
+ }
+
+ override def listDatabases(pattern: String): Seq[String] = withClient {
+ client.listDatabases(pattern)
+ }
+
+ override def setCurrentDatabase(db: String): Unit = withClient {
+ client.setCurrentDatabase(db)
+ }
+
+ // --------------------------------------------------------------------------
+ // Tables
+ // --------------------------------------------------------------------------
+
+ override def createTable(
+ db: String,
+ tableDefinition: CatalogTable,
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ requireDbMatches(db, tableDefinition)
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
+
+ override def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireDbExists(db)
+ client.dropTable(db, table, ignoreIfNotExists)
+ }
+
+ override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+ val newTable = client.getTable(db, oldName)
+ .copy(identifier = TableIdentifier(newName, Some(db)))
+ client.alterTable(oldName, newTable)
+ }
+
+ /**
+ * Alter a table whose name that matches the one specified in `tableDefinition`,
+ * assuming the table exists.
+ *
+ * Note: As of now, this only supports altering table properties, serde properties,
+ * and num buckets!
+ */
+ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient {
+ requireDbMatches(db, tableDefinition)
+ requireTableExists(db, tableDefinition.identifier.table)
+ client.alterTable(tableDefinition)
+ }
+
+ override def getTable(db: String, table: String): CatalogTable = withClient {
+ client.getTable(db, table)
+ }
+
+ override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient {
+ client.getTableOption(db, table)
+ }
+
+ override def tableExists(db: String, table: String): Boolean = withClient {
+ client.getTableOption(db, table).isDefined
+ }
+
+ override def listTables(db: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db)
+ }
+
+ override def listTables(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
+ client.listTables(db, pattern)
+ }
+
+ // --------------------------------------------------------------------------
+ // Partitions
+ // --------------------------------------------------------------------------
+
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.createPartitions(db, table, parts, ignoreIfExists)
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = withClient {
+ requireTableExists(db, table)
+ client.dropPartitions(db, table, parts, ignoreIfNotExists)
+ }
+
+ override def renamePartitions(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec],
+ newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
+ client.renamePartitions(db, table, specs, newSpecs)
+ }
+
+ override def alterPartitions(
+ db: String,
+ table: String,
+ newParts: Seq[CatalogTablePartition]): Unit = withClient {
+ client.alterPartitions(db, table, newParts)
+ }
+
+ override def getPartition(
+ db: String,
+ table: String,
+ spec: TablePartitionSpec): CatalogTablePartition = withClient {
+ client.getPartition(db, table, spec)
+ }
+
+ override def listPartitions(
+ db: String,
+ table: String): Seq[CatalogTablePartition] = withClient {
+ client.getAllPartitions(db, table)
+ }
+
+ // --------------------------------------------------------------------------
+ // Functions
+ // --------------------------------------------------------------------------
+
+ override def createFunction(
+ db: String,
+ funcDefinition: CatalogFunction): Unit = withClient {
+ // Hive's metastore is case insensitive. However, Hive's createFunction does
+ // not normalize the function name (unlike the getFunction part). So,
+ // we are normalizing the function name.
+ val functionName = funcDefinition.identifier.funcName.toLowerCase
+ val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
+ client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
+ }
+
+ override def dropFunction(db: String, name: String): Unit = withClient {
+ client.dropFunction(db, name)
+ }
+
+ override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ client.renameFunction(db, oldName, newName)
+ }
+
+ override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ client.getFunction(db, funcName)
+ }
+
+ override def functionExists(db: String, funcName: String): Boolean = withClient {
+ client.functionExists(db, funcName)
+ }
+
+ override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ client.listFunctions(db, pattern)
+ }
+
+}