aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-07 16:23:17 -0700
committerYin Huai <yhuai@databricks.com>2016-04-07 16:23:17 -0700
commitae1db91d158d1ae62a0ab7ea74467679ca050101 (patch)
treeeea4630e6ac9b3b2a2a9b17b2ef510f39ab0e954
parentaa852215f82876977d164f371627e894e86baacc (diff)
downloadspark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.gz
spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.bz2
spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.zip
[SPARK-14410][SQL] Push functions existence check into catalog
## What changes were proposed in this pull request? This is a followup to #12117 and addresses some of the TODOs introduced there. In particular, the resolution of database is now pushed into session catalog, which knows about the current database. Further, the logic for checking whether a function exists is pushed into the external catalog. No change in functionality is expected. ## How was this patch tested? `SessionCatalogSuite`, `DDLSuite` Author: Andrew Or <andrew@databricks.com> Closes #12198 from andrewor14/function-exists.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala59
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala41
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala2
13 files changed, 126 insertions, 114 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 5d136b663f..186bbccef1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
- private def functionExists(db: String, funcName: String): Boolean = {
- requireDbExists(db)
- catalog(db).functions.contains(funcName)
- }
-
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
@@ -315,6 +310,11 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).functions(funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = {
+ requireDbExists(db)
+ catalog(db).functions.contains(funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2acf584e8f..7db9fd0527 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -95,7 +95,7 @@ class SessionCatalog(
externalCatalog.alterDatabase(dbDefinition)
}
- def getDatabase(db: String): CatalogDatabase = {
+ def getDatabaseMetadata(db: String): CatalogDatabase = {
externalCatalog.getDatabase(db)
}
@@ -169,7 +169,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then an [[AnalysisException]] is thrown.
*/
- def getTable(name: TableIdentifier): CatalogTable = {
+ def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
externalCatalog.getTable(db, table)
@@ -435,28 +435,37 @@ class SessionCatalog(
* Create a metastore function in the database specified in `funcDefinition`.
* If no such database is specified, create it in the current database.
*/
- def createFunction(funcDefinition: CatalogFunction): Unit = {
+ def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = funcDefinition.identifier.database.getOrElse(currentDb)
- val newFuncDefinition = funcDefinition.copy(
- identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- externalCatalog.createFunction(db, newFuncDefinition)
+ val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
+ val newFuncDefinition = funcDefinition.copy(identifier = identifier)
+ if (!functionExists(identifier)) {
+ externalCatalog.createFunction(db, newFuncDefinition)
+ } else if (!ignoreIfExists) {
+ throw new AnalysisException(s"function '$identifier' already exists in database '$db'")
+ }
}
/**
* Drop a metastore function.
* If no database is specified, assume the function is in the current database.
*/
- def dropFunction(name: FunctionIdentifier): Unit = {
+ def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
- val qualified = name.copy(database = Some(db)).unquotedString
- if (functionRegistry.functionExists(qualified)) {
- // If we have loaded this function into the FunctionRegistry,
- // also drop it from there.
- // For a permanent function, because we loaded it to the FunctionRegistry
- // when it's first used, we also need to drop it from the FunctionRegistry.
- functionRegistry.dropFunction(qualified)
+ val identifier = name.copy(database = Some(db))
+ if (functionExists(identifier)) {
+ // TODO: registry should just take in FunctionIdentifier for type safety
+ if (functionRegistry.functionExists(identifier.unquotedString)) {
+ // If we have loaded this function into the FunctionRegistry,
+ // also drop it from there.
+ // For a permanent function, because we loaded it to the FunctionRegistry
+ // when it's first used, we also need to drop it from the FunctionRegistry.
+ functionRegistry.dropFunction(identifier.unquotedString)
+ }
+ externalCatalog.dropFunction(db, name.funcName)
+ } else if (!ignoreIfNotExists) {
+ throw new AnalysisException(s"function '$identifier' does not exist in database '$db'")
}
- externalCatalog.dropFunction(db, name.funcName)
}
/**
@@ -465,8 +474,7 @@ class SessionCatalog(
* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
- // TODO: have a better name. This method is actually for fetching the metadata of a function.
- def getFunction(name: FunctionIdentifier): CatalogFunction = {
+ def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
externalCatalog.getFunction(db, name.funcName)
}
@@ -475,20 +483,9 @@ class SessionCatalog(
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
- if (functionRegistry.functionExists(name.unquotedString)) {
- // This function exists in the FunctionRegistry.
- true
- } else {
- // Need to check if this function exists in the metastore.
- try {
- // TODO: It's better to ask external catalog if this function exists.
- // So, we can avoid of having this hacky try/catch block.
- getFunction(name) != null
- } catch {
- case _: NoSuchFunctionException => false
- case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
- }
- }
+ val db = name.database.getOrElse(currentDb)
+ functionRegistry.functionExists(name.unquotedString) ||
+ externalCatalog.functionExists(db, name.funcName)
}
// ----------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 97b9946140..e29d6bd8b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -152,6 +152,8 @@ abstract class ExternalCatalog {
def getFunction(db: String, funcName: String): CatalogFunction
+ def functionExists(db: String, funcName: String): Boolean
+
def listFunctions(db: String, pattern: String): Seq[String]
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 4d56d001b3..1850dc8156 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -62,7 +62,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database when a database exists") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
assert(db1.name == "db1")
assert(db1.description.contains("db1"))
}
@@ -70,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database should throw exception when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getDatabase("db_that_does_not_exist")
+ catalog.getDatabaseMetadata("db_that_does_not_exist")
}
}
@@ -128,10 +128,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter database") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
// Note: alter properties here because Hive does not support altering other fields
catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
- val newDb1 = catalog.getDatabase("db1")
+ val newDb1 = catalog.getDatabaseMetadata("db1")
assert(db1.properties.isEmpty)
assert(newDb1.properties.size == 2)
assert(newDb1.properties.get("k") == Some("v3"))
@@ -346,21 +346,21 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get table") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
== externalCatalog.getTable("db2", "tbl1"))
// Get table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1"))
== externalCatalog.getTable("db2", "tbl1"))
}
test("get table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+ catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
}
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+ catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
}
}
@@ -386,7 +386,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
- val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
val relationWithAlias =
SubqueryAlias(alias,
@@ -659,26 +659,28 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newEmptyCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+ sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
// Create function without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
- sessionCatalog.createFunction(newFunc("myfunc2"))
+ sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
}
test("create function when database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+ catalog.createFunction(
+ newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
}
}
test("create function that already exists") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func1", Some("db2")))
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false)
}
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true)
}
test("create temp function") {
@@ -711,24 +713,27 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
- sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+ sessionCatalog.dropFunction(
+ FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
// Drop function without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+ sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
- sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+ sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
}
test("drop function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+ catalog.dropFunction(
+ FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false)
}
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true)
}
test("drop temp function") {
@@ -753,19 +758,19 @@ class SessionCatalogSuite extends SparkFunSuite {
val expected =
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[(String, String)])
- assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
- assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected)
}
test("get function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+ catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist")))
}
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+ catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2")))
}
}
@@ -787,8 +792,8 @@ class SessionCatalogSuite extends SparkFunSuite {
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
- catalog.createFunction(newFunc("func2", Some("db2")))
- catalog.createFunction(newFunc("not_me", Some("db2")))
+ catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
+ catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.listFunctions("db1", "*").toSet ==
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index faa7a2cdb4..3fd2a93d29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -407,7 +407,7 @@ case class ShowTablePropertiesCommand(
if (catalog.isTemporaryTable(table)) {
Seq.empty[Row]
} else {
- val catalogTable = sqlContext.sessionState.catalog.getTable(table)
+ val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
propertyKey match {
case Some(p) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6d56a6fec8..20779d68e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -124,7 +124,7 @@ case class AlterDatabaseProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val db: CatalogDatabase = catalog.getDatabase(databaseName)
+ val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
catalog.alterDatabase(db.copy(properties = db.properties ++ props))
Seq.empty[Row]
@@ -149,7 +149,8 @@ case class DescribeDatabase(
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName)
+ val dbMetadata: CatalogDatabase =
+ sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
@@ -213,7 +214,7 @@ case class AlterTableSetProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
val newProperties = table.properties ++ properties
if (DDLUtils.isDatasourceTable(newProperties)) {
throw new AnalysisException(
@@ -243,7 +244,7 @@ case class AlterTableUnsetProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"alter table properties is not supported for datasource tables")
@@ -286,7 +287,7 @@ case class AlterTableSerDeProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
// Do not support setting serde for datasource tables
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -376,7 +377,7 @@ case class AlterTableSetLocation(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 66d17e322e..c6e601799f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -47,6 +47,7 @@ case class CreateFunction(
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(
@@ -55,24 +56,18 @@ case class CreateFunction(
}
// We first load resources and then put the builder in the function registry.
// Please note that it is allowed to overwrite an existing temp function.
- sqlContext.sessionState.catalog.loadFunctionResources(resources)
+ catalog.loadFunctionResources(resources)
val info = new ExpressionInfo(className, functionName)
- val builder =
- sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
- sqlContext.sessionState.catalog.createTempFunction(
- functionName, info, builder, ignoreIfExists = false)
+ val builder = catalog.makeFunctionBuilder(functionName, className)
+ catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
- val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
- val func = FunctionIdentifier(functionName, Some(dbName))
- val catalogFunc = CatalogFunction(func, className, resources)
- if (sqlContext.sessionState.catalog.functionExists(func)) {
- throw new AnalysisException(
- s"Function '$functionName' already exists in database '$dbName'.")
- }
- sqlContext.sessionState.catalog.createFunction(catalogFunc)
+ // TODO: should we also parse "IF NOT EXISTS"?
+ catalog.createFunction(
+ CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
+ ignoreIfExists = false)
}
Seq.empty[Row]
}
@@ -101,13 +96,9 @@ case class DropFunction(
catalog.dropTempFunction(functionName, ifExists)
} else {
// We are dropping a permanent function.
- val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
- val func = FunctionIdentifier(functionName, Some(dbName))
- if (!ifExists && !catalog.functionExists(func)) {
- throw new AnalysisException(
- s"Function '$functionName' does not exist in database '$dbName'.")
- }
- catalog.dropFunction(func)
+ catalog.dropFunction(
+ FunctionIdentifier(functionName, databaseName),
+ ignoreIfNotExists = ifExists)
}
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a8db4e9923..7084665b3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -88,7 +88,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
@@ -110,7 +110,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
@@ -233,14 +233,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
- assert(catalog.getTable(tableIdent).properties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
// set table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
- assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel"))
+ assert(catalog.getTableMetadata(tableIdent).properties ==
+ Map("andrew" -> "or14", "kor" -> "bel"))
// set table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
- assert(catalog.getTable(tableIdent).properties ==
+ assert(catalog.getTableMetadata(tableIdent).properties ==
Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
// table to alter does not exist
intercept[AnalysisException] {
@@ -262,11 +263,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
- assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
+ assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
// unset table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
- assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
+ assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
@@ -278,7 +279,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.getMessage.contains("xyz"))
// property to unset does not exist, but "IF EXISTS" is specified
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
- assert(catalog.getTable(tableIdent).properties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
// throw exception for datasource tables
convertToDatasourceTable(catalog, tableIdent)
val e1 = intercept[AnalysisException] {
@@ -393,7 +394,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def convertToDatasourceTable(
catalog: SessionCatalog,
tableIdent: TableIdentifier): Unit = {
- catalog.alterTable(catalog.getTable(tableIdent).copy(
+ catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
properties = Map("spark.sql.sources.provider" -> "csv")))
}
@@ -407,15 +408,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
// Verify that the location is set to the expected string
def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
- .getOrElse { catalog.getTable(tableIdent).storage }
+ .getOrElse { catalog.getTableMetadata(tableIdent).storage }
if (isDatasourceTable) {
if (spec.isDefined) {
assert(storageFormat.serdeProperties.isEmpty)
@@ -467,8 +468,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -482,22 +483,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e2.getMessage.contains("datasource"))
} else {
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
- assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
- assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 98a5998d03..b1156fb3e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -292,6 +292,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.getFunction(db, funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = withClient {
+ client.functionExists(db, funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
client.listFunctions(db, pattern)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ee56f9d75d..94794b1572 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -232,6 +232,11 @@ private[hive] trait HiveClient {
/** Return an existing function in the database, or None if it doesn't exist. */
def getFunctionOption(db: String, name: String): Option[CatalogFunction]
+ /** Return whether a function exists in the specified database. */
+ final def functionExists(db: String, name: String): Boolean = {
+ getFunctionOption(db, name).isDefined
+ }
+
/** Return the names of all functions that match the given pattern in the database. */
def listFunctions(db: String, pattern: String): Seq[String]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1f66fbfd85..d0eb9ddf50 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,7 +21,6 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -30,7 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
@@ -559,7 +559,11 @@ private[hive] class HiveClientImpl(
override def getFunctionOption(
db: String,
name: String): Option[CatalogFunction] = withHiveState {
- Option(client.getFunction(db, name)).map(fromHiveFunction)
+ try {
+ Option(client.getFunction(db, name)).map(fromHiveFunction)
+ } catch {
+ case he: HiveException => None
+ }
}
override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 6967395613..ada8621d07 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable =
+ sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +145,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable =
+ sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index dd2129375d..c5417b06a4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -354,7 +354,7 @@ object PermanentHiveUDFTest2 extends Logging {
FunctionIdentifier("example_max"),
"org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
("JAR" -> jar) :: Nil)
- hiveContext.sessionState.catalog.createFunction(function)
+ hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
val source =
hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
source.registerTempTable("sourceTable")