aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-09-02 22:31:01 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-02 22:31:01 +0800
commit247a4faf06c1dd47a6543c56929cd0182a03e106 (patch)
treec9a90898083b72494aa4a5fc0ce0de12fc0c4def /sql
parent7ee24dac8e779f6a9bf45371fdc2be83fb679cb2 (diff)
downloadspark-247a4faf06c1dd47a6543c56929cd0182a03e106.tar.gz
spark-247a4faf06c1dd47a6543c56929cd0182a03e106.tar.bz2
spark-247a4faf06c1dd47a6543c56929cd0182a03e106.zip
[SPARK-16935][SQL] Verification of Function-related ExternalCatalog APIs
### What changes were proposed in this pull request? Function-related `HiveExternalCatalog` APIs do not have enough verification logics. After the PR, `HiveExternalCatalog` and `InMemoryCatalog` become consistent in the error handling. For example, below is the exception we got when calling `renameFunction`. ``` 15:13:40.369 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db1, returning NoSuchObjectException 15:13:40.377 WARN org.apache.hadoop.hive.metastore.ObjectStore: Failed to get database db2, returning NoSuchObjectException 15:13:40.739 ERROR DataNucleus.Datastore.Persist: Update of object "org.apache.hadoop.hive.metastore.model.MFunction205629e9" using statement "UPDATE FUNCS SET FUNC_NAME=? WHERE FUNC_ID=?" failed : org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException: The statement was aborted because it would have caused a duplicate key value in a unique or primary key constraint or unique index identified by 'UNIQUEFUNCTION' defined on 'FUNCS'. at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source) at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source) ``` ### How was this patch tested? Improved the existing test cases to check whether the messages are right. Author: gatorsmile <gatorsmile@gmail.com> Closes #14521 from gatorsmile/functionChecking.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala19
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala8
4 files changed, 34 insertions, 28 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 27e1810814..df72baaba2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
/**
@@ -38,6 +38,18 @@ abstract class ExternalCatalog {
}
}
+ protected def requireFunctionExists(db: String, funcName: String): Unit = {
+ if (!functionExists(db, funcName)) {
+ throw new NoSuchFunctionException(db = db, func = funcName)
+ }
+ }
+
+ protected def requireFunctionNotExists(db: String, funcName: String): Unit = {
+ if (functionExists(db, funcName)) {
+ throw new FunctionAlreadyExistsException(db = db, func = funcName)
+ }
+ }
+
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index b55ddcb54b..4e361a536d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -63,18 +63,6 @@ class InMemoryCatalog(
catalog(db).tables(table).partitions.contains(spec)
}
- private def requireFunctionExists(db: String, funcName: String): Unit = {
- if (!functionExists(db, funcName)) {
- throw new NoSuchFunctionException(db = db, func = funcName)
- }
- }
-
- private def requireFunctionNotExists(db: String, funcName: String): Unit = {
- if (functionExists(db, funcName)) {
- throw new FunctionAlreadyExistsException(db = db, func = funcName)
- }
- }
-
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new NoSuchTableException(db = db, table = table)
@@ -474,11 +462,8 @@ class InMemoryCatalog(
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
- if (functionExists(db, func.identifier.funcName)) {
- throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName)
- } else {
- catalog(db).functions.put(func.identifier.funcName, func)
- }
+ requireFunctionNotExists(db, func.identifier.funcName)
+ catalog(db).functions.put(func.identifier.funcName, func)
}
override def dropFunction(db: String, funcName: String): Unit = synchronized {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 19f8665383..f283f4287c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -450,14 +451,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("create function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.createFunction("does_not_exist", newFunc())
}
}
test("create function that already exists") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[FunctionAlreadyExistsException] {
catalog.createFunction("db2", newFunc("func1"))
}
}
@@ -471,14 +472,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("drop function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.dropFunction("does_not_exist", "something")
}
}
test("drop function that does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.dropFunction("db2", "does_not_exist")
}
}
@@ -488,14 +489,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.getFunction("db2", "func1") ==
CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
Seq.empty[FunctionResource]))
- intercept[AnalysisException] {
+ intercept[NoSuchFunctionException] {
catalog.getFunction("db2", "does_not_exist")
}
}
test("get function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.getFunction("does_not_exist", "func1")
}
}
@@ -505,15 +506,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val newName = "funcky"
assert(catalog.getFunction("db2", "func1").className == funcClass)
catalog.renameFunction("db2", "func1", newName)
- intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
+ intercept[NoSuchFunctionException] { catalog.getFunction("db2", "func1") }
assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
assert(catalog.getFunction("db2", newName).className == funcClass)
- intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
+ intercept[NoSuchFunctionException] { catalog.renameFunction("db2", "does_not_exist", "me") }
}
test("rename function when database does not exist") {
val catalog = newBasicCatalog()
- intercept[AnalysisException] {
+ intercept[NoSuchDatabaseException] {
catalog.renameFunction("does_not_exist", "func1", "func5")
}
}
@@ -521,7 +522,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("rename function when new function already exists") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2", Some("db2")))
- intercept[AnalysisException] {
+ intercept[FunctionAlreadyExistsException] {
catalog.renameFunction("db2", "func1", "func2")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ed87ac3c3e..8541ae2322 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -570,31 +570,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
+ requireDbExists(db)
// Hive's metastore is case insensitive. However, Hive's createFunction does
// not normalize the function name (unlike the getFunction part). So,
// we are normalizing the function name.
val functionName = funcDefinition.identifier.funcName.toLowerCase
+ requireFunctionNotExists(db, functionName)
val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
override def dropFunction(db: String, name: String): Unit = withClient {
+ requireFunctionExists(db, name)
client.dropFunction(db, name)
}
override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+ requireFunctionExists(db, oldName)
+ requireFunctionNotExists(db, newName)
client.renameFunction(db, oldName, newName)
}
override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
+ requireFunctionExists(db, funcName)
client.getFunction(db, funcName)
}
override def functionExists(db: String, funcName: String): Boolean = withClient {
+ requireDbExists(db)
client.functionExists(db, funcName)
}
override def listFunctions(db: String, pattern: String): Seq[String] = withClient {
+ requireDbExists(db)
client.listFunctions(db, pattern)
}