aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-05 12:27:06 -0700
committerAndrew Or <andrew@databricks.com>2016-04-05 12:27:06 -0700
commit72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (patch)
tree8767dc0a6e575dac7cbd87ccbb5c01f504d6b851 /sql/hive/src/main
parentbc36df127d3b9f56b4edaeb5eca7697d4aef761a (diff)
downloadspark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.gz
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.bz2
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.zip
[SPARK-14123][SPARK-14384][SQL] Handle CreateFunction/DropFunction
## What changes were proposed in this pull request? This PR implements CreateFunction and DropFunction commands. Besides implementing these two commands, we also change how to manage functions. Here are the main changes. * `FunctionRegistry` will be a container to store all functions builders and it will not actively load any functions. Because of this change, we do not need to maintain a separate registry for HiveContext. So, `HiveFunctionRegistry` is deleted. * SessionCatalog takes care the job of loading a function if this function is not in the `FunctionRegistry` but its metadata is stored in the external catalog. For this case, SessionCatalog will (1) load the metadata from the external catalog, (2) load all needed resources (i.e. jars and files), (3) create a function builder based on the function definition, (4) register the function builder in the `FunctionRegistry`. * A `UnresolvedGenerator` is created. So, the parser will not need to call `FunctionRegistry` directly during parsing, which is not a good time to create a Hive UDTF. In the analysis phase, we will resolve `UnresolvedGenerator`. This PR is based on viirya's https://github.com/apache/spark/pull/12036/ ## How was this patch tested? Existing tests and new tests. ## TODOs [x] Self-review [x] Cleanup [x] More tests for create/drop functions (we need to more tests for permanent functions). [ ] File JIRAs for all TODOs [x] Standardize the error message when a function does not exist. Author: Yin Huai <yhuai@databricks.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12117 from yhuai/function.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala143
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala119
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala16
7 files changed, 184 insertions, 160 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 11205ae67c..98a5998d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -272,7 +272,12 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
- client.createFunction(db, funcDefinition)
+ // Hive's metastore is case insensitive. However, Hive's createFunction does
+ // not normalize the function name (unlike the getFunction part). So,
+ // we are normalizing the function name.
+ val functionName = funcDefinition.identifier.funcName.toLowerCase
+ val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
+ client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
override def dropFunction(db: String, name: String): Unit = withClient {
@@ -283,10 +288,6 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.renameFunction(db, oldName, newName)
}
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
- client.alterFunction(db, funcDefinition)
- }
-
override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
client.getFunction(db, funcName)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index dfbf22cc47..d315f39a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -17,27 +17,39 @@
package org.apache.spark.sql.hive
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
-class HiveSessionCatalog(
+private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
client: HiveClient,
context: HiveContext,
+ functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf)
- extends SessionCatalog(externalCatalog, functionRegistry, conf) {
+ extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
override def setCurrentDatabase(db: String): Unit = {
super.setCurrentDatabase(db)
@@ -112,4 +124,129 @@ class HiveSessionCatalog(
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
}
+ override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
+ makeFunctionBuilder(funcName, Utils.classForName(className))
+ }
+
+ /**
+ * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ */
+ private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
+ // When we instantiate hive UDF wrapper class, we may throw exception if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch, input number
+ // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
+ (children: Seq[Expression]) => {
+ try {
+ if (classOf[UDF].isAssignableFrom(clazz)) {
+ val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+ val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(
+ name,
+ new HiveFunctionWrapper(clazz.getName),
+ children,
+ isUDAFBridgeRequired = true)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+ val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udtf.elementTypes // Force it to check input data types.
+ udtf
+ } else {
+ throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
+ }
+ } catch {
+ case ae: AnalysisException =>
+ throw ae
+ case NonFatal(e) =>
+ val analysisException =
+ new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
+ analysisException.setStackTrace(e.getStackTrace)
+ throw analysisException
+ }
+ }
+ }
+
+ // We have a list of Hive built-in functions that we do not support. So, we will check
+ // Hive's function registry and lazily load needed functions into our own function registry.
+ // Those Hive built-in functions are
+ // assert_true, collect_list, collect_set, compute_stats, context_ngrams, create_union,
+ // current_user ,elt, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
+ // histogram_numeric, in_file, index, inline, java_method, map_keys, map_values,
+ // matchpath, ngrams, noop, noopstreaming, noopwithmap, noopwithmapstreaming,
+ // parse_url, parse_url_tuple, percentile, percentile_approx, posexplode, reflect, reflect2,
+ // regexp, sentences, stack, std, str_to_map, windowingtablefunction, xpath, xpath_boolean,
+ // xpath_double, xpath_float, xpath_int, xpath_long, xpath_number,
+ // xpath_short, and xpath_string.
+ override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
+ // if (super.functionExists(name)) {
+ // super.lookupFunction(name, children)
+ // } else {
+ // // This function is a Hive builtin function.
+ // ...
+ // }
+ Try(super.lookupFunction(name, children)) match {
+ case Success(expr) => expr
+ case Failure(error) =>
+ if (functionRegistry.functionExists(name)) {
+ // If the function actually exists in functionRegistry, it means that there is an
+ // error when we create the Expression using the given children.
+ // We need to throw the original exception.
+ throw error
+ } else {
+ // This function is not in functionRegistry, let's try to load it as a Hive's
+ // built-in function.
+ // Hive is case insensitive.
+ val functionName = name.toLowerCase
+ // TODO: This may not really work for current_user because current_user is not evaluated
+ // with session info.
+ // We do not need to use executionHive at here because we only load
+ // Hive's builtin functions, which do not need current db.
+ val functionInfo = {
+ try {
+ Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+ failFunctionLookup(name))
+ } catch {
+ // If HiveFunctionRegistry.getFunctionInfo throws an exception,
+ // we are failing to load a Hive builtin function, which means that
+ // the given function is not a Hive builtin function.
+ case NonFatal(e) => failFunctionLookup(name)
+ }
+ }
+ val className = functionInfo.getFunctionClass.getName
+ val builder = makeFunctionBuilder(functionName, className)
+ // Put this Hive built-in function to our function registry.
+ val info = new ExpressionInfo(className, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ // Now, we need to create the Expression.
+ functionRegistry.lookupFunction(functionName, children)
+ }
+ }
+ }
+
+ // Pre-load a few commonly used Hive built-in functions.
+ HiveSessionCatalog.preloadedHiveBuiltinFunctions.foreach {
+ case (functionName, clazz) =>
+ val builder = makeFunctionBuilder(functionName, clazz)
+ val info = new ExpressionInfo(clazz.getCanonicalName, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ }
+}
+
+private[sql] object HiveSessionCatalog {
+ // This is the list of Hive's built-in functions that are commonly used and we want to
+ // pre-load when we create the FunctionRegistry.
+ val preloadedHiveBuiltinFunctions =
+ ("collect_set", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet]) ::
+ ("collect_list", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList]) :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 829afa8432..cff24e28fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -36,25 +36,23 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
}
/**
- * Internal catalog for managing functions registered by the user.
- * Note that HiveUDFs will be overridden by functions registered in this context.
- */
- override lazy val functionRegistry: FunctionRegistry = {
- new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
- }
-
- /**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
- new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, functionRegistry, conf)
+ new HiveSessionCatalog(
+ ctx.hiveCatalog,
+ ctx.metadataHive,
+ ctx,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
}
/**
* An analyzer that uses the Hive metastore.
*/
override lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a31178e347..1f66fbfd85 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,13 +21,14 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -37,6 +38,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
@@ -611,6 +613,9 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ val resourceUris = f.resources.map { case (resourceType, resourcePath) =>
+ new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath)
+ }
new HiveFunction(
f.identifier.funcName,
db,
@@ -619,12 +624,21 @@ private[hive] class HiveClientImpl(
PrincipalType.USER,
(System.currentTimeMillis / 1000).toInt,
FunctionType.JAVA,
- List.empty[ResourceUri].asJava)
+ resourceUris.asJava)
}
private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
- new CatalogFunction(name, hf.getClassName)
+ val resources = hf.getResourceUris.asScala.map { uri =>
+ val resourceType = uri.getResourceType() match {
+ case ResourceType.ARCHIVE => "archive"
+ case ResourceType.FILE => "file"
+ case ResourceType.JAR => "jar"
+ case r => throw new AnalysisException(s"Unknown resource type: $r")
+ }
+ (resourceType, uri.getUri())
+ }
+ new CatalogFunction(name, hf.getClassName, resources)
}
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 55e69f99a4..c6c0b2ca59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -21,13 +21,13 @@ import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.parse.EximUtil
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -278,19 +278,6 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
}
/**
- * Create a [[Generator]]. Override this method in order to support custom Generators.
- */
- override protected def withGenerator(
- name: String,
- expressions: Seq[Expression],
- ctx: LateralViewContext): Generator = {
- val info = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse {
- throw new ParseException(s"Couldn't find Generator function '$name'", ctx)
- }
- HiveGenericUDTF(name, new HiveFunctionWrapper(info.getFunctionClass.getName), expressions)
- }
-
- /**
* Create a [[HiveScriptIOSchema]].
*/
override protected def withScriptIOSchema(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 5ada3d5598..784b018353 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
import org.apache.hadoop.hive.ql.exec._
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
@@ -31,130 +30,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, O
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{analysis, InternalRow}
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types._
-private[hive] class HiveFunctionRegistry(
- underlying: analysis.FunctionRegistry,
- executionHive: HiveClientImpl)
- extends analysis.FunctionRegistry with HiveInspectors {
-
- def getFunctionInfo(name: String): FunctionInfo = {
- // Hive Registry need current database to lookup function
- // TODO: the current database of executionHive should be consistent with metadataHive
- executionHive.withHiveState {
- FunctionRegistry.getFunctionInfo(name)
- }
- }
-
- override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- Try(underlying.lookupFunction(name, children)).getOrElse {
- // We only look it up to see if it exists, but do not include it in the HiveUDF since it is
- // not always serializable.
- val functionInfo: FunctionInfo =
- Option(getFunctionInfo(name.toLowerCase)).getOrElse(
- throw new AnalysisException(s"undefined function $name"))
-
- val functionClassName = functionInfo.getFunctionClass.getName
-
- // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions
- // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
- // catch the exception and throw AnalysisException instead.
- try {
- if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(
- name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (
- classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(
- name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children)
- udtf.elementTypes // Force it to check input data types.
- udtf
- } else {
- throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}")
- }
- } catch {
- case analysisException: AnalysisException =>
- // If the exception is an AnalysisException, just throw it.
- throw analysisException
- case throwable: Throwable =>
- // If there is any other error, we throw an AnalysisException.
- val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
- s"because: ${throwable.getMessage}."
- val analysisException = new AnalysisException(errorMessage)
- analysisException.setStackTrace(throwable.getStackTrace)
- throw analysisException
- }
- }
- }
-
- override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
- : Unit = underlying.registerFunction(name, info, builder)
-
- /* List all of the registered function names. */
- override def listFunction(): Seq[String] = {
- (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted
- }
-
- /* Get the class of the registered function by specified name. */
- override def lookupFunction(name: String): Option[ExpressionInfo] = {
- underlying.lookupFunction(name).orElse(
- Try {
- val info = getFunctionInfo(name)
- val annotation = info.getFunctionClass.getAnnotation(classOf[Description])
- if (annotation != null) {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- annotation.name(),
- annotation.value(),
- annotation.extended()))
- } else {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- name,
- null,
- null))
- }
- }.getOrElse(None))
- }
-
- override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = {
- underlying.lookupFunctionBuilder(name)
- }
-
- // Note: This does not drop functions stored in the metastore
- override def dropFunction(name: String): Boolean = {
- underlying.dropFunction(name)
- }
-
-}
-
private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9393302355..7f6ca21782 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -201,8 +201,13 @@ class TestHiveContext private[hive](
}
override lazy val functionRegistry = {
- new TestHiveFunctionRegistry(
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+ // We use TestHiveFunctionRegistry at here to track functions that have been explicitly
+ // unregistered (through TestHiveFunctionRegistry.unregisterFunction method).
+ val fr = new TestHiveFunctionRegistry
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach {
+ case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ }
+ fr
}
}
@@ -528,19 +533,18 @@ class TestHiveContext private[hive](
}
-private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
- extends HiveFunctionRegistry(fr, client) {
+private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
private val removedFunctions =
collection.mutable.ArrayBuffer.empty[(String, (ExpressionInfo, FunctionBuilder))]
def unregisterFunction(name: String): Unit = {
- fr.functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
+ functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
}
def restore(): Unit = {
removedFunctions.foreach {
- case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ case (name, (info, builder)) => registerFunction(name, info, builder)
}
}
}