aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-04-12 09:01:26 -0700
committerXiao Li <gatorsmile@gmail.com>2017-04-12 09:01:26 -0700
commit504e62e2f4b7df7e002ea014a855cebe1ff95193 (patch)
treee24fb9939cf38f1be5590882175a7d33961b81b8
parentceaf77ae43a14e993ac6d1ff34b50256eacd6abb (diff)
downloadspark-504e62e2f4b7df7e002ea014a855cebe1ff95193.tar.gz
spark-504e62e2f4b7df7e002ea014a855cebe1ff95193.tar.bz2
spark-504e62e2f4b7df7e002ea014a855cebe1ff95193.zip
[SPARK-20303][SQL] Rename createTempFunction to registerFunction
### What changes were proposed in this pull request? Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by `registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo` into the new `registerFunction`, instead of duplicating the logics everywhere. In the next PRs, the remaining Function-related APIs also need cleanups. ### How was this patch tested? Existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17615 from gatorsmile/cleanupCreateTempFunction.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala31
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala10
7 files changed, 53 insertions, 63 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index ec56fe7729..57f7a80bed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -44,6 +44,3 @@ class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[Tabl
class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")
-
-class TempFunctionAlreadyExistsException(func: String)
- extends AnalysisException(s"Temporary function '$func' already exists")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index faedf5f91c..1417bccf65 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1050,7 +1050,7 @@ class SessionCatalog(
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
- def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
+ protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
// TODO: at least support UDAFs here
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
}
@@ -1064,18 +1064,20 @@ class SessionCatalog(
}
/**
- * Create a temporary function.
- * This assumes no database is specified in `funcDefinition`.
+ * Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
*/
- def createTempFunction(
- name: String,
- info: ExpressionInfo,
- funcDefinition: FunctionBuilder,
- ignoreIfExists: Boolean): Unit = {
- if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
- throw new TempFunctionAlreadyExistsException(name)
+ def registerFunction(
+ funcDefinition: CatalogFunction,
+ ignoreIfExists: Boolean,
+ functionBuilder: Option[FunctionBuilder] = None): Unit = {
+ val func = funcDefinition.identifier
+ if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) {
+ throw new AnalysisException(s"Function $func already exists")
}
- functionRegistry.registerFunction(name, info, funcDefinition)
+ val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
+ val builder =
+ functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className))
+ functionRegistry.registerFunction(func.unquotedString, info, builder)
}
/**
@@ -1180,12 +1182,7 @@ class SessionCatalog(
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
- val info = new ExpressionInfo(
- catalogFunction.className,
- qualifiedName.database.orNull,
- qualifiedName.funcName)
- val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className)
- createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false)
+ registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName.unquotedString, children)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9ba846fb25..be8903000a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -1162,10 +1162,10 @@ abstract class SessionCatalogSuite extends PlanTest {
withBasicCatalog { catalog =>
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
- val info1 = new ExpressionInfo("tempFunc1", "temp1")
- val info2 = new ExpressionInfo("tempFunc2", "temp2")
- catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
- catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
+ catalog.registerFunction(
+ newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
+ catalog.registerFunction(
+ newFunc("temp2", None), ignoreIfExists = false, functionBuilder = Some(tempFunc2))
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1))
assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3))
@@ -1174,13 +1174,15 @@ abstract class SessionCatalogSuite extends PlanTest {
catalog.lookupFunction(FunctionIdentifier("temp3"), arguments)
}
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
- val info3 = new ExpressionInfo("tempFunc3", "temp1")
// Temporary function already exists
- intercept[TempFunctionAlreadyExistsException] {
- catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
- }
+ val e = intercept[AnalysisException] {
+ catalog.registerFunction(
+ newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc3))
+ }.getMessage
+ assert(e.contains("Function temp1 already exists"))
// Temporary function is overridden
- catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
+ catalog.registerFunction(
+ newFunc("temp1", None), ignoreIfExists = true, functionBuilder = Some(tempFunc3))
assert(
catalog.lookupFunction(
FunctionIdentifier("temp1"), arguments) === Literal(arguments.length))
@@ -1193,8 +1195,8 @@ abstract class SessionCatalogSuite extends PlanTest {
assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
val tempFunc1 = (e: Seq[Expression]) => e.head
- val info1 = new ExpressionInfo("tempFunc1", "temp1")
- catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
+ catalog.registerFunction(
+ newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
// Returns true when the function is temporary
assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1")))
@@ -1243,9 +1245,9 @@ abstract class SessionCatalogSuite extends PlanTest {
test("drop temp function") {
withBasicCatalog { catalog =>
- val info = new ExpressionInfo("tempFunc", "func1")
val tempFunc = (e: Seq[Expression]) => e.head
- catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
+ catalog.registerFunction(
+ newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc))
val arguments = Seq(Literal(1), Literal(2), Literal(3))
assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1284,9 +1286,9 @@ abstract class SessionCatalogSuite extends PlanTest {
test("lookup temp function") {
withBasicCatalog { catalog =>
- val info1 = new ExpressionInfo("tempFunc1", "func1")
val tempFunc1 = (e: Seq[Expression]) => e.head
- catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
+ catalog.registerFunction(
+ newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1))
assert(catalog.lookupFunction(
FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
@@ -1298,14 +1300,14 @@ abstract class SessionCatalogSuite extends PlanTest {
test("list functions") {
withBasicCatalog { catalog =>
- val info1 = new ExpressionInfo("tempFunc1", "func1")
- val info2 = new ExpressionInfo("tempFunc2", "yes_me")
+ val funcMeta1 = newFunc("func1", None)
+ val funcMeta2 = newFunc("yes_me", None)
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false)
catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false)
- catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
- catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
+ catalog.registerFunction(funcMeta1, ignoreIfExists = false, functionBuilder = Some(tempFunc1))
+ catalog.registerFunction(funcMeta2, ignoreIfExists = false, functionBuilder = Some(tempFunc2))
assert(catalog.listFunctions("db1", "*").map(_._1).toSet ==
Set(FunctionIdentifier("func1"),
FunctionIdentifier("yes_me")))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 5687f93324..e0d0029369 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -51,6 +51,7 @@ case class CreateFunctionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
+ val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
if (isTemp) {
if (databaseName.isDefined) {
throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
@@ -59,17 +60,13 @@ case class CreateFunctionCommand(
// We first load resources and then put the builder in the function registry.
// Please note that it is allowed to overwrite an existing temp function.
catalog.loadFunctionResources(resources)
- val info = new ExpressionInfo(className, functionName)
- val builder = catalog.makeFunctionBuilder(functionName, className)
- catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ catalog.registerFunction(func, ignoreIfExists = false)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
// This function will be loaded into the FunctionRegistry when a query uses it.
// We do not load it into FunctionRegistry right now.
// TODO: should we also parse "IF NOT EXISTS"?
- catalog.createFunction(
- CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources),
- ignoreIfExists = false)
+ catalog.createFunction(func, ignoreIfExists = false)
}
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 6469e501c1..8f9c52cb1e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -75,9 +75,10 @@ class CatalogSuite
}
private def createTempFunction(name: String): Unit = {
- val info = new ExpressionInfo("className", name)
val tempFunc = (e: Seq[Expression]) => e.head
- sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false)
+ val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil)
+ sessionCatalog.registerFunction(
+ funcMeta, ignoreIfExists = false, functionBuilder = Some(tempFunc))
}
private def dropFunction(name: String, db: Option[String] = None): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index c917f110b9..377d4f2473 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
@@ -124,13 +124,6 @@ private[sql] class HiveSessionCatalog(
}
private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
- // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
- // if (super.functionExists(name)) {
- // super.lookupFunction(name, children)
- // } else {
- // // This function is a Hive builtin function.
- // ...
- // }
val database = name.database.map(formatDatabaseName)
val funcName = name.copy(database = database)
Try(super.lookupFunction(funcName, children)) match {
@@ -164,10 +157,11 @@ private[sql] class HiveSessionCatalog(
}
}
val className = functionInfo.getFunctionClass.getName
- val builder = makeFunctionBuilder(functionName, className)
+ val functionIdentifier =
+ FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database)
+ val func = CatalogFunction(functionIdentifier, className, Nil)
// Put this Hive built-in function to our function registry.
- val info = new ExpressionInfo(className, functionName)
- createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ registerFunction(func, ignoreIfExists = false)
// Now, we need to create the Expression.
functionRegistry.lookupFunction(functionName, children)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
index 197110f491..73383ae4d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
@@ -22,7 +22,9 @@ import scala.concurrent.duration._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox
import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo, Literal}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.sql.hive.execution.TestingTypedCount
@@ -217,9 +219,9 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingle
private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = {
val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
- val builder = sessionCatalog.makeFunctionBuilder(functionName, clazz.getName)
- val info = new ExpressionInfo(clazz.getName, functionName)
- sessionCatalog.createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ val functionIdentifier = FunctionIdentifier(functionName, database = None)
+ val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil)
+ sessionCatalog.registerFunction(func, ignoreIfExists = false)
}
private def percentile_approx(