aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-07 16:23:17 -0700
committerYin Huai <yhuai@databricks.com>2016-04-07 16:23:17 -0700
commitae1db91d158d1ae62a0ab7ea74467679ca050101 (patch)
treeeea4630e6ac9b3b2a2a9b17b2ef510f39ab0e954 /sql/catalyst
parentaa852215f82876977d164f371627e894e86baacc (diff)
downloadspark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.gz
spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.bz2
spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.zip
[SPARK-14410][SQL] Push functions existence check into catalog
## What changes were proposed in this pull request? This is a followup to #12117 and addresses some of the TODOs introduced there. In particular, the resolution of database is now pushed into session catalog, which knows about the current database. Further, the logic for checking whether a function exists is pushed into the external catalog. No change in functionality is expected. ## How was this patch tested? `SessionCatalogSuite`, `DDLSuite` Author: Andrew Or <andrew@databricks.com> Closes #12198 from andrewor14/function-exists.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala59
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala53
4 files changed, 64 insertions, 60 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 5d136b663f..186bbccef1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
- private def functionExists(db: String, funcName: String): Boolean = {
- requireDbExists(db)
- catalog(db).functions.contains(funcName)
- }
-
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
@@ -315,6 +310,11 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).functions(funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = {
+ requireDbExists(db)
+ catalog(db).functions.contains(funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2acf584e8f..7db9fd0527 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -95,7 +95,7 @@ class SessionCatalog(
externalCatalog.alterDatabase(dbDefinition)
}
- def getDatabase(db: String): CatalogDatabase = {
+ def getDatabaseMetadata(db: String): CatalogDatabase = {
externalCatalog.getDatabase(db)
}
@@ -169,7 +169,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then an [[AnalysisException]] is thrown.
*/
- def getTable(name: TableIdentifier): CatalogTable = {
+ def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
externalCatalog.getTable(db, table)
@@ -435,28 +435,37 @@ class SessionCatalog(
* Create a metastore function in the database specified in `funcDefinition`.
* If no such database is specified, create it in the current database.
*/
- def createFunction(funcDefinition: CatalogFunction): Unit = {
+ def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = funcDefinition.identifier.database.getOrElse(currentDb)
- val newFuncDefinition = funcDefinition.copy(
- identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- externalCatalog.createFunction(db, newFuncDefinition)
+ val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
+ val newFuncDefinition = funcDefinition.copy(identifier = identifier)
+ if (!functionExists(identifier)) {
+ externalCatalog.createFunction(db, newFuncDefinition)
+ } else if (!ignoreIfExists) {
+ throw new AnalysisException(s"function '$identifier' already exists in database '$db'")
+ }
}
/**
* Drop a metastore function.
* If no database is specified, assume the function is in the current database.
*/
- def dropFunction(name: FunctionIdentifier): Unit = {
+ def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
- val qualified = name.copy(database = Some(db)).unquotedString
- if (functionRegistry.functionExists(qualified)) {
- // If we have loaded this function into the FunctionRegistry,
- // also drop it from there.
- // For a permanent function, because we loaded it to the FunctionRegistry
- // when it's first used, we also need to drop it from the FunctionRegistry.
- functionRegistry.dropFunction(qualified)
+ val identifier = name.copy(database = Some(db))
+ if (functionExists(identifier)) {
+ // TODO: registry should just take in FunctionIdentifier for type safety
+ if (functionRegistry.functionExists(identifier.unquotedString)) {
+ // If we have loaded this function into the FunctionRegistry,
+ // also drop it from there.
+ // For a permanent function, because we loaded it to the FunctionRegistry
+ // when it's first used, we also need to drop it from the FunctionRegistry.
+ functionRegistry.dropFunction(identifier.unquotedString)
+ }
+ externalCatalog.dropFunction(db, name.funcName)
+ } else if (!ignoreIfNotExists) {
+ throw new AnalysisException(s"function '$identifier' does not exist in database '$db'")
}
- externalCatalog.dropFunction(db, name.funcName)
}
/**
@@ -465,8 +474,7 @@ class SessionCatalog(
* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
- // TODO: have a better name. This method is actually for fetching the metadata of a function.
- def getFunction(name: FunctionIdentifier): CatalogFunction = {
+ def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
externalCatalog.getFunction(db, name.funcName)
}
@@ -475,20 +483,9 @@ class SessionCatalog(
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
- if (functionRegistry.functionExists(name.unquotedString)) {
- // This function exists in the FunctionRegistry.
- true
- } else {
- // Need to check if this function exists in the metastore.
- try {
- // TODO: It's better to ask external catalog if this function exists.
- // So, we can avoid of having this hacky try/catch block.
- getFunction(name) != null
- } catch {
- case _: NoSuchFunctionException => false
- case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
- }
- }
+ val db = name.database.getOrElse(currentDb)
+ functionRegistry.functionExists(name.unquotedString) ||
+ externalCatalog.functionExists(db, name.funcName)
}
// ----------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 97b9946140..e29d6bd8b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -152,6 +152,8 @@ abstract class ExternalCatalog {
def getFunction(db: String, funcName: String): CatalogFunction
+ def functionExists(db: String, funcName: String): Boolean
+
def listFunctions(db: String, pattern: String): Seq[String]
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 4d56d001b3..1850dc8156 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -62,7 +62,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database when a database exists") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
assert(db1.name == "db1")
assert(db1.description.contains("db1"))
}
@@ -70,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database should throw exception when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getDatabase("db_that_does_not_exist")
+ catalog.getDatabaseMetadata("db_that_does_not_exist")
}
}
@@ -128,10 +128,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter database") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
// Note: alter properties here because Hive does not support altering other fields
catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
- val newDb1 = catalog.getDatabase("db1")
+ val newDb1 = catalog.getDatabaseMetadata("db1")
assert(db1.properties.isEmpty)
assert(newDb1.properties.size == 2)
assert(newDb1.properties.get("k") == Some("v3"))
@@ -346,21 +346,21 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get table") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
== externalCatalog.getTable("db2", "tbl1"))
// Get table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1"))
== externalCatalog.getTable("db2", "tbl1"))
}
test("get table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+ catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
}
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+ catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
}
}
@@ -386,7 +386,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
- val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
val relationWithAlias =
SubqueryAlias(alias,
@@ -659,26 +659,28 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newEmptyCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+ sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
// Create function without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
- sessionCatalog.createFunction(newFunc("myfunc2"))
+ sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
}
test("create function when database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+ catalog.createFunction(
+ newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
}
}
test("create function that already exists") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func1", Some("db2")))
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false)
}
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true)
}
test("create temp function") {
@@ -711,24 +713,27 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
- sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+ sessionCatalog.dropFunction(
+ FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
// Drop function without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+ sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
- sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+ sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
}
test("drop function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+ catalog.dropFunction(
+ FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false)
}
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true)
}
test("drop temp function") {
@@ -753,19 +758,19 @@ class SessionCatalogSuite extends SparkFunSuite {
val expected =
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[(String, String)])
- assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
- assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected)
}
test("get function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+ catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist")))
}
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+ catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2")))
}
}
@@ -787,8 +792,8 @@ class SessionCatalogSuite extends SparkFunSuite {
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
- catalog.createFunction(newFunc("func2", Some("db2")))
- catalog.createFunction(newFunc("not_me", Some("db2")))
+ catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
+ catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.listFunctions("db1", "*").toSet ==