aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala85
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala22
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala158
3 files changed, 248 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 08a01e8601..974ef900e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ListenerBus
/**
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
*
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
*/
-abstract class ExternalCatalog {
+abstract class ExternalCatalog
+ extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec
protected def requireDbExists(db: String): Unit = {
@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+ final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+ val db = dbDefinition.name
+ postToAll(CreateDatabasePreEvent(db))
+ doCreateDatabase(dbDefinition, ignoreIfExists)
+ postToAll(CreateDatabaseEvent(db))
+ }
+
+ protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+ final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+ postToAll(DropDatabasePreEvent(db))
+ doDropDatabase(db, ignoreIfNotExists, cascade)
+ postToAll(DropDatabaseEvent(db))
+ }
- def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+ protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
/**
* Alter a database whose name matches the one specified in `dbDefinition`,
@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
// Tables
// --------------------------------------------------------------------------
- def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
+ final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+ val db = tableDefinition.database
+ val name = tableDefinition.identifier.table
+ postToAll(CreateTablePreEvent(db, name))
+ doCreateTable(tableDefinition, ignoreIfExists)
+ postToAll(CreateTableEvent(db, name))
+ }
- def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
+ protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
- def renameTable(db: String, oldName: String, newName: String): Unit
+ final def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ postToAll(DropTablePreEvent(db, table))
+ doDropTable(db, table, ignoreIfNotExists, purge)
+ postToAll(DropTableEvent(db, table))
+ }
+
+ protected def doDropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
+
+ final def renameTable(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameTablePreEvent(db, oldName, newName))
+ doRenameTable(db, oldName, newName)
+ postToAll(RenameTableEvent(db, oldName, newName))
+ }
+
+ protected def doRenameTable(db: String, oldName: String, newName: String): Unit
/**
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
@@ -269,11 +312,30 @@ abstract class ExternalCatalog {
// Functions
// --------------------------------------------------------------------------
- def createFunction(db: String, funcDefinition: CatalogFunction): Unit
+ final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
+ val name = funcDefinition.identifier.funcName
+ postToAll(CreateFunctionPreEvent(db, name))
+ doCreateFunction(db, funcDefinition)
+ postToAll(CreateFunctionEvent(db, name))
+ }
- def dropFunction(db: String, funcName: String): Unit
+ protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
- def renameFunction(db: String, oldName: String, newName: String): Unit
+ final def dropFunction(db: String, funcName: String): Unit = {
+ postToAll(DropFunctionPreEvent(db, funcName))
+ doDropFunction(db, funcName)
+ postToAll(DropFunctionEvent(db, funcName))
+ }
+
+ protected def doDropFunction(db: String, funcName: String): Unit
+
+ final def renameFunction(db: String, oldName: String, newName: String): Unit = {
+ postToAll(RenameFunctionPreEvent(db, oldName, newName))
+ doRenameFunction(db, oldName, newName)
+ postToAll(RenameFunctionEvent(db, oldName, newName))
+ }
+
+ protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
def getFunction(db: String, funcName: String): CatalogFunction
@@ -281,4 +343,9 @@ abstract class ExternalCatalog {
def listFunctions(db: String, pattern: String): Seq[String]
+ override protected def doPostEvent(
+ listener: ExternalCatalogEventListener,
+ event: ExternalCatalogEvent): Unit = {
+ listener.onEvent(event)
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9ca1c71d1d..81dd8efc00 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -98,7 +98,7 @@ class InMemoryCatalog(
// Databases
// --------------------------------------------------------------------------
- override def createDatabase(
+ override protected def doCreateDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
@@ -119,7 +119,7 @@ class InMemoryCatalog(
}
}
- override def dropDatabase(
+ override protected def doDropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = synchronized {
@@ -180,7 +180,7 @@ class InMemoryCatalog(
// Tables
// --------------------------------------------------------------------------
- override def createTable(
+ override protected def doCreateTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean): Unit = synchronized {
assert(tableDefinition.identifier.database.isDefined)
@@ -221,7 +221,7 @@ class InMemoryCatalog(
}
}
- override def dropTable(
+ override protected def doDropTable(
db: String,
table: String,
ignoreIfNotExists: Boolean,
@@ -264,7 +264,10 @@ class InMemoryCatalog(
}
}
- override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
+ override protected def doRenameTable(
+ db: String,
+ oldName: String,
+ newName: String): Unit = synchronized {
requireTableExists(db, oldName)
requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
@@ -565,18 +568,21 @@ class InMemoryCatalog(
// Functions
// --------------------------------------------------------------------------
- override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+ override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
requireFunctionNotExists(db, func.identifier.funcName)
catalog(db).functions.put(func.identifier.funcName, func)
}
- override def dropFunction(db: String, funcName: String): Unit = synchronized {
+ override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions.remove(funcName)
}
- override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+ override protected def doRenameFunction(
+ db: String,
+ oldName: String,
+ newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
new file mode 100644
index 0000000000..459973a13b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * Event emitted by the external catalog when it is modified. Events are either fired before or
+ * after the modification (the event should document this).
+ */
+trait ExternalCatalogEvent extends SparkListenerEvent
+
+/**
+ * Listener interface for external catalog modification events.
+ */
+trait ExternalCatalogEventListener {
+ def onEvent(event: ExternalCatalogEvent): Unit
+}
+
+/**
+ * Event fired when a database is create or dropped.
+ */
+trait DatabaseEvent extends ExternalCatalogEvent {
+ /**
+ * Database of the object that was touched.
+ */
+ val database: String
+}
+
+/**
+ * Event fired before a database is created.
+ */
+case class CreateDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been created.
+ */
+case class CreateDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired before a database is dropped.
+ */
+case class DropDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been dropped.
+ */
+case class DropDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired when a table is created, dropped or renamed.
+ */
+trait TableEvent extends DatabaseEvent {
+ /**
+ * Name of the table that was touched.
+ */
+ val name: String
+}
+
+/**
+ * Event fired before a table is created.
+ */
+case class CreateTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been created.
+ */
+case class CreateTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is dropped.
+ */
+case class DropTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been dropped.
+ */
+case class DropTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is renamed.
+ */
+case class RenameTablePreEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends TableEvent
+
+/**
+ * Event fired after a table has been renamed.
+ */
+case class RenameTableEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends TableEvent
+
+/**
+ * Event fired when a function is created, dropped or renamed.
+ */
+trait FunctionEvent extends DatabaseEvent {
+ /**
+ * Name of the function that was touched.
+ */
+ val name: String
+}
+
+/**
+ * Event fired before a function is created.
+ */
+case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been created.
+ */
+case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is dropped.
+ */
+case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been dropped.
+ */
+case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is renamed.
+ */
+case class RenameFunctionPreEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends FunctionEvent
+
+/**
+ * Event fired after a function has been renamed.
+ */
+case class RenameFunctionEvent(
+ database: String,
+ name: String,
+ newName: String)
+ extends FunctionEvent