aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2017-04-21 00:05:03 -0700
committerReynold Xin <rxin@databricks.com>2017-04-21 00:05:03 -0700
commite2b3d2367a563d4600d8d87b5317e71135c362f0 (patch)
tree34a3c4e4d5fdaa95a720516284051a09275b8d66
parent48d760d028dd73371f99d084c4195dbc4dda5267 (diff)
downloadspark-e2b3d2367a563d4600d8d87b5317e71135c362f0.tar.gz
spark-e2b3d2367a563d4600d8d87b5317e71135c362f0.tar.bz2
spark-e2b3d2367a563d4600d8d87b5317e71135c362f0.zip
[SPARK-20420][SQL] Add events to the external catalog
## What changes were proposed in this pull request? It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change. The following events are fired per object: - Database - CreateDatabasePreEvent: event fired before the database is created. - CreateDatabaseEvent: event fired after the database has been created. - DropDatabasePreEvent: event fired before the database is dropped. - DropDatabaseEvent: event fired after the database has been dropped. - Table - CreateTablePreEvent: event fired before the table is created. - CreateTableEvent: event fired after the table has been created. - RenameTablePreEvent: event fired before the table is renamed. - RenameTableEvent: event fired after the table has been renamed. - DropTablePreEvent: event fired before the table is dropped. - DropTableEvent: event fired after the table has been dropped. - Function - CreateFunctionPreEvent: event fired before the function is created. - CreateFunctionEvent: event fired after the function has been created. - RenameFunctionPreEvent: event fired before the function is renamed. - RenameFunctionEvent: event fired after the function has been renamed. - DropFunctionPreEvent: event fired before the function is dropped. - DropFunctionPreEvent: event fired after the function has been dropped. The current events currently only contain the names of the object modified. We add more events, and more details at a later point. A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`. ## How was this patch tested? Added the `ExternalCatalogEventSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17710 from hvanhovell/SPARK-20420.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala85
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala22
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala158
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala188
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala22
6 files changed, 457 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 08a01e8601..974ef900e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ListenerBus
/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
-abstract class ExternalCatalog {
+abstract class ExternalCatalog
+ extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = {
@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+ final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+ val db = dbDefinition.name
+ postToAll(CreateDatabasePreEvent(db))
+ doCreateDatabase(dbDefinition, ignoreIfExists)
+ postToAll(CreateDatabaseEvent(db))
+ }
+
+ protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+ final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+ postToAll(DropDatabasePreEvent(db))
+ doDropDatabase(db, ignoreIfNotExists, cascade)
+ postToAll(DropDatabaseEvent(db))
+ }
- def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+ protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter a database whose name matches the one specified in `dbDefinition`,
@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------
- def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
+ final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ val db = tableDefinition.database
+ val name = tableDefinition.identifier.table
+ postToAll(CreateTablePreEvent(db, name))
+ doCreateTable(tableDefinition, ignoreIfExists)
+ postToAll(CreateTableEvent(db, name))
+ }
- def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
+ protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
- def renameTable(db: String, oldName: String, newName: String): Unit
+ final def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ postToAll(DropTablePreEvent(db, table))
+ doDropTable(db, table, ignoreIfNotExists, purge)
+ postToAll(DropTableEvent(db, table))
+ }
+
+ protected def doDropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
+
+ final def renameTable(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameTablePreEvent(db, oldName, newName))
+ doRenameTable(db, oldName, newName)
+ postToAll(RenameTableEvent(db, oldName, newName))
+ }
+
+ protected def doRenameTable(db: String, oldName: String, newName: String): Unit
/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
@@ -269,11 +312,30 @@ abstract class ExternalCatalog {
// Functions
// --------------------------------------------------------------------------
- def createFunction(db: String, funcDefinition: CatalogFunction): Unit
+ final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
+ val name = funcDefinition.identifier.funcName
+ postToAll(CreateFunctionPreEvent(db, name))
+ doCreateFunction(db, funcDefinition)
+ postToAll(CreateFunctionEvent(db, name))
+ }
- def dropFunction(db: String, funcName: String): Unit
+ protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
- def renameFunction(db: String, oldName: String, newName: String): Unit
+ final def dropFunction(db: String, funcName: String): Unit = {
+ postToAll(DropFunctionPreEvent(db, funcName))
+ doDropFunction(db, funcName)
+ postToAll(DropFunctionEvent(db, funcName))
+ }
+
+ protected def doDropFunction(db: String, funcName: String): Unit
+
+ final def renameFunction(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameFunctionPreEvent(db, oldName, newName))
+ doRenameFunction(db, oldName, newName)
+ postToAll(RenameFunctionEvent(db, oldName, newName))
+ }
+
+ protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction
@@ -281,4 +343,9 @@ abstract class ExternalCatalog {
def listFunctions(db: String, pattern: String): Seq[String]
+ override protected def doPostEvent(
+ listener: ExternalCatalogEventListener,
+ event: ExternalCatalogEvent): Unit = {
+ listener.onEvent(event)
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9ca1c71d1d..81dd8efc00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -98,7 +98,7 @@ class InMemoryCatalog(
// Databases
// --------------------------------------------------------------------------
- override def createDatabase(
+ override protected def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
@@ -119,7 +119,7 @@ class InMemoryCatalog(
}
}
- override def dropDatabase(
+ override protected def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
@@ -180,7 +180,7 @@ class InMemoryCatalog(
// Tables
// --------------------------------------------------------------------------
- override def createTable(
+ override protected def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
@@ -221,7 +221,7 @@ class InMemoryCatalog(
}
}
- override def dropTable(
+ override protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
@@ -264,7 +264,10 @@ class InMemoryCatalog(
}
}
- override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
+ override protected def doRenameTable(
+ db: String,
+ oldName: String,
+ newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
@@ -565,18 +568,21 @@ class InMemoryCatalog(
// Functions
// --------------------------------------------------------------------------
- override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+ override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}
- override def dropFunction(db: String, funcName: String): Unit = synchronized {
+ override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}
- override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+ override protected def doRenameFunction(
+ db: String,
+ oldName: String,
+ newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
new file mode 100644
index 0000000000..459973a13b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * Event emitted by the external catalog when it is modified. Events are either fired before or
+ * after the modification (the event should document this).
+ */
+trait ExternalCatalogEvent extends SparkListenerEvent
+
+/**
+ * Listener interface for external catalog modification events.
+ */
+trait ExternalCatalogEventListener {
+ def onEvent(event: ExternalCatalogEvent): Unit
+}
+
+/**
+ * Event fired when a database is create or dropped.
+ */
+trait DatabaseEvent extends ExternalCatalogEvent {
+ /**
+ * Database of the object that was touched.
+ */
+ val database: String
+}
+
+/**
+ * Event fired before a database is created.
+ */
+case class CreateDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been created.
+ */
+case class CreateDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired before a database is dropped.
+ */
+case class DropDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been dropped.
+ */
+case class DropDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired when a table is created, dropped or renamed.
+ */
+trait TableEvent extends DatabaseEvent {
+ /**
+ * Name of the table that was touched.
+ */
+ val name: String
+}
+
+/**
+ * Event fired before a table is created.
+ */
+case class CreateTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been created.
+ */
+case class CreateTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is dropped.
+ */
+case class DropTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been dropped.
+ */
+case class DropTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is renamed.
+ */
+case class RenameTablePreEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends TableEvent
+
+/**
+ * Event fired after a table has been renamed.
+ */
+case class RenameTableEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends TableEvent
+
+/**
+ * Event fired when a function is created, dropped or renamed.
+ */
+trait FunctionEvent extends DatabaseEvent {
+ /**
+ * Name of the function that was touched.
+ */
+ val name: String
+}
+
+/**
+ * Event fired before a function is created.
+ */
+case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been created.
+ */
+case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is dropped.
+ */
+case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been dropped.
+ */
+case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is renamed.
+ */
+case class RenameFunctionPreEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends FunctionEvent
+
+/**
+ * Event fired after a function has been renamed.
+ */
+case class RenameFunctionEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends FunctionEvent
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
new file mode 100644
index 0000000000..2539ea615f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.catalog
+
+import java.net.URI
+import java.nio.file.{Files, Path}
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Test Suite for external catalog events
+ */
+class ExternalCatalogEventSuite extends SparkFunSuite {
+
+ protected def newCatalog: ExternalCatalog = new InMemoryCatalog()
+
+ private def testWithCatalog(
+ name: String)(
+ f: (ExternalCatalog, Seq[ExternalCatalogEvent] => Unit) => Unit): Unit = test(name) {
+ val catalog = newCatalog
+ val recorder = mutable.Buffer.empty[ExternalCatalogEvent]
+ catalog.addListener(new ExternalCatalogEventListener {
+ override def onEvent(event: ExternalCatalogEvent): Unit = {
+ recorder += event
+ }
+ })
+ f(catalog, (expected: Seq[ExternalCatalogEvent]) => {
+ val actual = recorder.clone()
+ recorder.clear()
+ assert(expected === actual)
+ })
+ }
+
+ private def createDbDefinition(uri: URI): CatalogDatabase = {
+ CatalogDatabase(name = "db5", description = "", locationUri = uri, Map.empty)
+ }
+
+ private def createDbDefinition(): CatalogDatabase = {
+ createDbDefinition(preparePath(Files.createTempDirectory("db_")))
+ }
+
+ private def preparePath(path: Path): URI = path.normalize().toUri
+
+ testWithCatalog("database") { (catalog, checkEvents) =>
+ // CREATE
+ val dbDefinition = createDbDefinition()
+
+ catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+ checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+ catalog.createDatabase(dbDefinition, ignoreIfExists = true)
+ checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+ intercept[AnalysisException] {
+ catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+ }
+ checkEvents(CreateDatabasePreEvent("db5") :: Nil)
+
+ // DROP
+ intercept[AnalysisException] {
+ catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false)
+ }
+ checkEvents(DropDatabasePreEvent("db4") :: Nil)
+
+ catalog.dropDatabase("db5", ignoreIfNotExists = false, cascade = false)
+ checkEvents(DropDatabasePreEvent("db5") :: DropDatabaseEvent("db5") :: Nil)
+
+ catalog.dropDatabase("db4", ignoreIfNotExists = true, cascade = false)
+ checkEvents(DropDatabasePreEvent("db4") :: DropDatabaseEvent("db4") :: Nil)
+ }
+
+ testWithCatalog("table") { (catalog, checkEvents) =>
+ val path1 = Files.createTempDirectory("db_")
+ val path2 = Files.createTempDirectory(path1, "tbl_")
+ val uri1 = preparePath(path1)
+ val uri2 = preparePath(path2)
+
+ // CREATE
+ val dbDefinition = createDbDefinition(uri1)
+
+ val storage = CatalogStorageFormat.empty.copy(
+ locationUri = Option(uri2))
+ val tableDefinition = CatalogTable(
+ identifier = TableIdentifier("tbl1", Some("db5")),
+ tableType = CatalogTableType.MANAGED,
+ storage = storage,
+ schema = new StructType().add("id", "long"))
+
+ catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+ checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+ catalog.createTable(tableDefinition, ignoreIfExists = false)
+ checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
+
+ catalog.createTable(tableDefinition, ignoreIfExists = true)
+ checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
+
+ intercept[AnalysisException] {
+ catalog.createTable(tableDefinition, ignoreIfExists = false)
+ }
+ checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil)
+
+ // RENAME
+ catalog.renameTable("db5", "tbl1", "tbl2")
+ checkEvents(
+ RenameTablePreEvent("db5", "tbl1", "tbl2") ::
+ RenameTableEvent("db5", "tbl1", "tbl2") :: Nil)
+
+ intercept[AnalysisException] {
+ catalog.renameTable("db5", "tbl1", "tbl2")
+ }
+ checkEvents(RenameTablePreEvent("db5", "tbl1", "tbl2") :: Nil)
+
+ // DROP
+ intercept[AnalysisException] {
+ catalog.dropTable("db5", "tbl1", ignoreIfNotExists = false, purge = true)
+ }
+ checkEvents(DropTablePreEvent("db5", "tbl1") :: Nil)
+
+ catalog.dropTable("db5", "tbl2", ignoreIfNotExists = false, purge = true)
+ checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
+
+ catalog.dropTable("db5", "tbl2", ignoreIfNotExists = true, purge = true)
+ checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
+ }
+
+ testWithCatalog("function") { (catalog, checkEvents) =>
+ // CREATE
+ val dbDefinition = createDbDefinition()
+
+ val functionDefinition = CatalogFunction(
+ identifier = FunctionIdentifier("fn7", Some("db5")),
+ className = "",
+ resources = Seq.empty)
+
+ val newIdentifier = functionDefinition.identifier.copy(funcName = "fn4")
+ val renamedFunctionDefinition = functionDefinition.copy(identifier = newIdentifier)
+
+ catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+ checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+ catalog.createFunction("db5", functionDefinition)
+ checkEvents(CreateFunctionPreEvent("db5", "fn7") :: CreateFunctionEvent("db5", "fn7") :: Nil)
+
+ intercept[AnalysisException] {
+ catalog.createFunction("db5", functionDefinition)
+ }
+ checkEvents(CreateFunctionPreEvent("db5", "fn7") :: Nil)
+
+ // RENAME
+ catalog.renameFunction("db5", "fn7", "fn4")
+ checkEvents(
+ RenameFunctionPreEvent("db5", "fn7", "fn4") ::
+ RenameFunctionEvent("db5", "fn7", "fn4") :: Nil)
+ intercept[AnalysisException] {
+ catalog.renameFunction("db5", "fn7", "fn4")
+ }
+ checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)
+
+ // DROP
+ intercept[AnalysisException] {
+ catalog.dropFunction("db5", "fn7")
+ }
+ checkEvents(DropFunctionPreEvent("db5", "fn7") :: Nil)
+
+ catalog.dropFunction("db5", "fn4")
+ checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index d06dbaa2d0..f834569e59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -109,6 +109,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
}
+ // Make sure we propagate external catalog events to the spark listener bus
+ externalCatalog.addListener(new ExternalCatalogEventListener {
+ override def onEvent(event: ExternalCatalogEvent): Unit = {
+ sparkContext.listenerBus.post(event)
+ }
+ })
+
/**
* A manager for global temporary views.
*/
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8b0fdf49ce..71e33c46b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -141,13 +141,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Databases
// --------------------------------------------------------------------------
- override def createDatabase(
+ override protected def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient {
client.createDatabase(dbDefinition, ignoreIfExists)
}
- override def dropDatabase(
+ override protected def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient {
@@ -194,7 +194,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Tables
// --------------------------------------------------------------------------
- override def createTable(
+ override protected def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
@@ -456,7 +456,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}
- override def dropTable(
+ override protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
@@ -465,7 +465,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.dropTable(db, table, ignoreIfNotExists, purge)
}
- override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+ override protected def doRenameTable(
+ db: String,
+ oldName: String,
+ newName: String): Unit = withClient {
val rawTable = getRawTable(db, oldName)
// Note that Hive serde tables don't use path option in storage properties to store the value
@@ -1056,7 +1059,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Functions
// --------------------------------------------------------------------------
- override def createFunction(
+ override protected def doCreateFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
requireDbExists(db)
@@ -1069,12 +1072,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
- override def dropFunction(db: String, name: String): Unit = withClient {
+ override protected def doDropFunction(db: String, name: String): Unit = withClient {
requireFunctionExists(db, name)
client.dropFunction(db, name)
}
- override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ override protected def doRenameFunction(
+ db: String,
+ oldName: String,
+ newName: String): Unit = withClient {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName)