aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala85
1 files changed, 76 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 08a01e8601..974ef900e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ListenerBus
/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
-abstract class ExternalCatalog {
+abstract class ExternalCatalog
+ extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = {
@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+ final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+ val db = dbDefinition.name
+ postToAll(CreateDatabasePreEvent(db))
+ doCreateDatabase(dbDefinition, ignoreIfExists)
+ postToAll(CreateDatabaseEvent(db))
+ }
+
+ protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+ final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+ postToAll(DropDatabasePreEvent(db))
+ doDropDatabase(db, ignoreIfNotExists, cascade)
+ postToAll(DropDatabaseEvent(db))
+ }
- def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+ protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter a database whose name matches the one specified in `dbDefinition`,
@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------
- def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
+ final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ val db = tableDefinition.database
+ val name = tableDefinition.identifier.table
+ postToAll(CreateTablePreEvent(db, name))
+ doCreateTable(tableDefinition, ignoreIfExists)
+ postToAll(CreateTableEvent(db, name))
+ }
- def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
+ protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
- def renameTable(db: String, oldName: String, newName: String): Unit
+ final def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ postToAll(DropTablePreEvent(db, table))
+ doDropTable(db, table, ignoreIfNotExists, purge)
+ postToAll(DropTableEvent(db, table))
+ }
+
+ protected def doDropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
+
+ final def renameTable(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameTablePreEvent(db, oldName, newName))
+ doRenameTable(db, oldName, newName)
+ postToAll(RenameTableEvent(db, oldName, newName))
+ }
+
+ protected def doRenameTable(db: String, oldName: String, newName: String): Unit
/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
@@ -269,11 +312,30 @@ abstract class ExternalCatalog {
// Functions
// --------------------------------------------------------------------------
- def createFunction(db: String, funcDefinition: CatalogFunction): Unit
+ final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
+ val name = funcDefinition.identifier.funcName
+ postToAll(CreateFunctionPreEvent(db, name))
+ doCreateFunction(db, funcDefinition)
+ postToAll(CreateFunctionEvent(db, name))
+ }
- def dropFunction(db: String, funcName: String): Unit
+ protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
- def renameFunction(db: String, oldName: String, newName: String): Unit
+ final def dropFunction(db: String, funcName: String): Unit = {
+ postToAll(DropFunctionPreEvent(db, funcName))
+ doDropFunction(db, funcName)
+ postToAll(DropFunctionEvent(db, funcName))
+ }
+
+ protected def doDropFunction(db: String, funcName: String): Unit
+
+ final def renameFunction(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameFunctionPreEvent(db, oldName, newName))
+ doRenameFunction(db, oldName, newName)
+ postToAll(RenameFunctionEvent(db, oldName, newName))
+ }
+
+ protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction
@@ -281,4 +343,9 @@ abstract class ExternalCatalog {
def listFunctions(db: String, pattern: String): Seq[String]
+ override protected def doPostEvent(
+ listener: ExternalCatalogEventListener,
+ event: ExternalCatalogEvent): Unit = {
+ listener.onEvent(event)
+ }
}