aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala59
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala41
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala2
13 files changed, 126 insertions, 114 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 5d136b663f..186bbccef1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog {
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
- private def functionExists(db: String, funcName: String): Boolean = {
- requireDbExists(db)
- catalog(db).functions.contains(funcName)
- }
-
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
@@ -315,6 +310,11 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).functions(funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = {
+ requireDbExists(db)
+ catalog(db).functions.contains(funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 2acf584e8f..7db9fd0527 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -95,7 +95,7 @@ class SessionCatalog(
externalCatalog.alterDatabase(dbDefinition)
}
- def getDatabase(db: String): CatalogDatabase = {
+ def getDatabaseMetadata(db: String): CatalogDatabase = {
externalCatalog.getDatabase(db)
}
@@ -169,7 +169,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
* If the specified table is not found in the database then an [[AnalysisException]] is thrown.
*/
- def getTable(name: TableIdentifier): CatalogTable = {
+ def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(currentDb)
val table = formatTableName(name.table)
externalCatalog.getTable(db, table)
@@ -435,28 +435,37 @@ class SessionCatalog(
* Create a metastore function in the database specified in `funcDefinition`.
* If no such database is specified, create it in the current database.
*/
- def createFunction(funcDefinition: CatalogFunction): Unit = {
+ def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = funcDefinition.identifier.database.getOrElse(currentDb)
- val newFuncDefinition = funcDefinition.copy(
- identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- externalCatalog.createFunction(db, newFuncDefinition)
+ val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
+ val newFuncDefinition = funcDefinition.copy(identifier = identifier)
+ if (!functionExists(identifier)) {
+ externalCatalog.createFunction(db, newFuncDefinition)
+ } else if (!ignoreIfExists) {
+ throw new AnalysisException(s"function '$identifier' already exists in database '$db'")
+ }
}
/**
* Drop a metastore function.
* If no database is specified, assume the function is in the current database.
*/
- def dropFunction(name: FunctionIdentifier): Unit = {
+ def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(currentDb)
- val qualified = name.copy(database = Some(db)).unquotedString
- if (functionRegistry.functionExists(qualified)) {
- // If we have loaded this function into the FunctionRegistry,
- // also drop it from there.
- // For a permanent function, because we loaded it to the FunctionRegistry
- // when it's first used, we also need to drop it from the FunctionRegistry.
- functionRegistry.dropFunction(qualified)
+ val identifier = name.copy(database = Some(db))
+ if (functionExists(identifier)) {
+ // TODO: registry should just take in FunctionIdentifier for type safety
+ if (functionRegistry.functionExists(identifier.unquotedString)) {
+ // If we have loaded this function into the FunctionRegistry,
+ // also drop it from there.
+ // For a permanent function, because we loaded it to the FunctionRegistry
+ // when it's first used, we also need to drop it from the FunctionRegistry.
+ functionRegistry.dropFunction(identifier.unquotedString)
+ }
+ externalCatalog.dropFunction(db, name.funcName)
+ } else if (!ignoreIfNotExists) {
+ throw new AnalysisException(s"function '$identifier' does not exist in database '$db'")
}
- externalCatalog.dropFunction(db, name.funcName)
}
/**
@@ -465,8 +474,7 @@ class SessionCatalog(
* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
- // TODO: have a better name. This method is actually for fetching the metadata of a function.
- def getFunction(name: FunctionIdentifier): CatalogFunction = {
+ def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
externalCatalog.getFunction(db, name.funcName)
}
@@ -475,20 +483,9 @@ class SessionCatalog(
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
- if (functionRegistry.functionExists(name.unquotedString)) {
- // This function exists in the FunctionRegistry.
- true
- } else {
- // Need to check if this function exists in the metastore.
- try {
- // TODO: It's better to ask external catalog if this function exists.
- // So, we can avoid of having this hacky try/catch block.
- getFunction(name) != null
- } catch {
- case _: NoSuchFunctionException => false
- case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
- }
- }
+ val db = name.database.getOrElse(currentDb)
+ functionRegistry.functionExists(name.unquotedString) ||
+ externalCatalog.functionExists(db, name.funcName)
}
// ----------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 97b9946140..e29d6bd8b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -152,6 +152,8 @@ abstract class ExternalCatalog {
def getFunction(db: String, funcName: String): CatalogFunction
+ def functionExists(db: String, funcName: String): Boolean
+
def listFunctions(db: String, pattern: String): Seq[String]
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 4d56d001b3..1850dc8156 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -62,7 +62,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database when a database exists") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
assert(db1.name == "db1")
assert(db1.description.contains("db1"))
}
@@ -70,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get database should throw exception when the database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getDatabase("db_that_does_not_exist")
+ catalog.getDatabaseMetadata("db_that_does_not_exist")
}
}
@@ -128,10 +128,10 @@ class SessionCatalogSuite extends SparkFunSuite {
test("alter database") {
val catalog = new SessionCatalog(newBasicCatalog())
- val db1 = catalog.getDatabase("db1")
+ val db1 = catalog.getDatabaseMetadata("db1")
// Note: alter properties here because Hive does not support altering other fields
catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
- val newDb1 = catalog.getDatabase("db1")
+ val newDb1 = catalog.getDatabaseMetadata("db1")
assert(db1.properties.isEmpty)
assert(newDb1.properties.size == 2)
assert(newDb1.properties.get("k") == Some("v3"))
@@ -346,21 +346,21 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get table") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
- assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
== externalCatalog.getTable("db2", "tbl1"))
// Get table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- assert(sessionCatalog.getTable(TableIdentifier("tbl1"))
+ assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1"))
== externalCatalog.getTable("db2", "tbl1"))
}
test("get table when database/table does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("tbl1", Some("unknown_db")))
+ catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db")))
}
intercept[AnalysisException] {
- catalog.getTable(TableIdentifier("unknown_table", Some("db2")))
+ catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2")))
}
}
@@ -386,7 +386,7 @@ class SessionCatalogSuite extends SparkFunSuite {
test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
- val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2")))
+ val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata))
val relationWithAlias =
SubqueryAlias(alias,
@@ -659,26 +659,28 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newEmptyCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
- sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")))
+ sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
// Create function without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
- sessionCatalog.createFunction(newFunc("myfunc2"))
+ sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false)
assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2"))
}
test("create function when database does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func5", Some("does_not_exist")))
+ catalog.createFunction(
+ newFunc("func5", Some("does_not_exist")), ignoreIfExists = false)
}
}
test("create function that already exists") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.createFunction(newFunc("func1", Some("db2")))
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false)
}
+ catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true)
}
test("create temp function") {
@@ -711,24 +713,27 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
- sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2")))
+ sessionCatalog.dropFunction(
+ FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
// Drop function without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.createFunction(newFunc("func2", Some("db2")))
+ sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2"))
- sessionCatalog.dropFunction(FunctionIdentifier("func2"))
+ sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false)
assert(externalCatalog.listFunctions("db2", "*").isEmpty)
}
test("drop function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist")))
+ catalog.dropFunction(
+ FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false)
}
intercept[AnalysisException] {
- catalog.dropFunction(FunctionIdentifier("does_not_exist"))
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false)
}
+ catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true)
}
test("drop temp function") {
@@ -753,19 +758,19 @@ class SessionCatalogSuite extends SparkFunSuite {
val expected =
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[(String, String)])
- assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
- assert(catalog.getFunction(FunctionIdentifier("func1")) == expected)
+ assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected)
}
test("get function when database/function does not exist") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist")))
+ catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist")))
}
intercept[AnalysisException] {
- catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2")))
+ catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2")))
}
}
@@ -787,8 +792,8 @@ class SessionCatalogSuite extends SparkFunSuite {
val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
- catalog.createFunction(newFunc("func2", Some("db2")))
- catalog.createFunction(newFunc("not_me", Some("db2")))
+ catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
+ catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.listFunctions("db1", "*").toSet ==
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index faa7a2cdb4..3fd2a93d29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -407,7 +407,7 @@ case class ShowTablePropertiesCommand(
if (catalog.isTemporaryTable(table)) {
Seq.empty[Row]
} else {
- val catalogTable = sqlContext.sessionState.catalog.getTable(table)
+ val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table)
propertyKey match {
case Some(p) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6d56a6fec8..20779d68e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -124,7 +124,7 @@ case class AlterDatabaseProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val db: CatalogDatabase = catalog.getDatabase(databaseName)
+ val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
catalog.alterDatabase(db.copy(properties = db.properties ++ props))
Seq.empty[Row]
@@ -149,7 +149,8 @@ case class DescribeDatabase(
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName)
+ val dbMetadata: CatalogDatabase =
+ sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName)
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
@@ -213,7 +214,7 @@ case class AlterTableSetProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
val newProperties = table.properties ++ properties
if (DDLUtils.isDatasourceTable(newProperties)) {
throw new AnalysisException(
@@ -243,7 +244,7 @@ case class AlterTableUnsetProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"alter table properties is not supported for datasource tables")
@@ -286,7 +287,7 @@ case class AlterTableSerDeProperties(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
// Do not support setting serde for datasource tables
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
@@ -376,7 +377,7 @@ case class AlterTableSetLocation(
override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
- val table = catalog.getTable(tableName)
+ val table = catalog.getTableMetadata(tableName)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 66d17e322e..c6e601799f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -47,6 +47,7 @@ case class CreateFunction(
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(
@@ -55,24 +56,18 @@ case class CreateFunction(
}
// We first load resources and then put the builder in the function registry.
// Please note that it is allowed to overwrite an existing temp function.
- sqlContext.sessionState.catalog.loadFunctionResources(resources)
+ catalog.loadFunctionResources(resources)
val info = new ExpressionInfo(className, functionName)
- val builder =
- sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
- sqlContext.sessionState.catalog.createTempFunction(
- functionName, info, builder, ignoreIfExists = false)
+ val builder = catalog.makeFunctionBuilder(functionName, className)
+ catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
- val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
- val func = FunctionIdentifier(functionName, Some(dbName))
- val catalogFunc = CatalogFunction(func, className, resources)
- if (sqlContext.sessionState.catalog.functionExists(func)) {
- throw new AnalysisException(
- s"Function '$functionName' already exists in database '$dbName'.")
- }
- sqlContext.sessionState.catalog.createFunction(catalogFunc)
+ // TODO: should we also parse "IF NOT EXISTS"?
+ catalog.createFunction(
+ CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
+ ignoreIfExists = false)
}
Seq.empty[Row]
}
@@ -101,13 +96,9 @@ case class DropFunction(
catalog.dropTempFunction(functionName, ifExists)
} else {
// We are dropping a permanent function.
- val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
- val func = FunctionIdentifier(functionName, Some(dbName))
- if (!ifExists && !catalog.functionExists(func)) {
- throw new AnalysisException(
- s"Function '$functionName' does not exist in database '$dbName'.")
- }
- catalog.dropFunction(func)
+ catalog.dropFunction(
+ FunctionIdentifier(functionName, databaseName),
+ ignoreIfNotExists = ifExists)
}
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a8db4e9923..7084665b3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -88,7 +88,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
@@ -110,7 +110,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
@@ -233,14 +233,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
- assert(catalog.getTable(tableIdent).properties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
// set table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
- assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel"))
+ assert(catalog.getTableMetadata(tableIdent).properties ==
+ Map("andrew" -> "or14", "kor" -> "bel"))
// set table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
- assert(catalog.getTable(tableIdent).properties ==
+ assert(catalog.getTableMetadata(tableIdent).properties ==
Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
// table to alter does not exist
intercept[AnalysisException] {
@@ -262,11 +263,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
- assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
+ assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
// unset table properties without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
- assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
+ assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
@@ -278,7 +279,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.getMessage.contains("xyz"))
// property to unset does not exist, but "IF EXISTS" is specified
sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
- assert(catalog.getTable(tableIdent).properties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
// throw exception for datasource tables
convertToDatasourceTable(catalog, tableIdent)
val e1 = intercept[AnalysisException] {
@@ -393,7 +394,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def convertToDatasourceTable(
catalog: SessionCatalog,
tableIdent: TableIdentifier): Unit = {
- catalog.alterTable(catalog.getTable(tableIdent).copy(
+ catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
properties = Map("spark.sql.sources.provider" -> "csv")))
}
@@ -407,15 +408,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
// Verify that the location is set to the expected string
def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
- .getOrElse { catalog.getTable(tableIdent).storage }
+ .getOrElse { catalog.getTableMetadata(tableIdent).storage }
if (isDatasourceTable) {
if (spec.isDefined) {
assert(storageFormat.serdeProperties.isEmpty)
@@ -467,8 +468,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
- assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@@ -482,22 +483,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e2.getMessage.contains("datasource"))
} else {
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
- assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
- assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty)
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
- assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
- assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+ assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 98a5998d03..b1156fb3e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -292,6 +292,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.getFunction(db, funcName)
}
+ override def functionExists(db: String, funcName: String): Boolean = withClient {
+ client.functionExists(db, funcName)
+ }
+
override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
client.listFunctions(db, pattern)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index ee56f9d75d..94794b1572 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -232,6 +232,11 @@ private[hive] trait HiveClient {
/** Return an existing function in the database, or None if it doesn't exist. */
def getFunctionOption(db: String, name: String): Option[CatalogFunction]
+ /** Return whether a function exists in the specified database. */
+ final def functionExists(db: String, name: String): Boolean = {
+ getFunctionOption(db, name).isDefined
+ }
+
/** Return the names of all functions that match the given pattern in the database. */
def listFunctions(db: String, pattern: String): Seq[String]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1f66fbfd85..d0eb9ddf50 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,7 +21,6 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -30,7 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
@@ -559,7 +559,11 @@ private[hive] class HiveClientImpl(
override def getFunctionOption(
db: String,
name: String): Option[CatalogFunction] = withHiveState {
- Option(client.getFunction(db, name)).map(fromHiveFunction)
+ try {
+ Option(client.getFunction(db, name)).map(fromHiveFunction)
+ } catch {
+ case he: HiveException => None
+ }
}
override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 6967395613..ada8621d07 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable =
+ sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +145,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)
- val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default")))
+ val hiveTable =
+ sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default")))
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index dd2129375d..c5417b06a4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -354,7 +354,7 @@ object PermanentHiveUDFTest2 extends Logging {
FunctionIdentifier("example_max"),
"org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
("JAR" -> jar) :: Nil)
- hiveContext.sessionState.catalog.createFunction(function)
+ hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false)
val source =
hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
source.registerTempTable("sourceTable")