aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala107
1 files changed, 1 insertions, 106 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index efaa052370..784b018353 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
import org.apache.hadoop.hive.ql.exec._
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
@@ -31,118 +30,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, O
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{analysis, InternalRow}
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types._
-private[hive] class HiveFunctionRegistry(
- underlying: analysis.FunctionRegistry,
- executionHive: HiveClientImpl)
- extends analysis.FunctionRegistry with HiveInspectors {
-
- def getFunctionInfo(name: String): FunctionInfo = {
- // Hive Registry need current database to lookup function
- // TODO: the current database of executionHive should be consistent with metadataHive
- executionHive.withHiveState {
- FunctionRegistry.getFunctionInfo(name)
- }
- }
-
- override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- Try(underlying.lookupFunction(name, children)).getOrElse {
- // We only look it up to see if it exists, but do not include it in the HiveUDF since it is
- // not always serializable.
- val functionInfo: FunctionInfo =
- Option(getFunctionInfo(name.toLowerCase)).getOrElse(
- throw new AnalysisException(s"undefined function $name"))
-
- val functionClassName = functionInfo.getFunctionClass.getName
-
- // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions
- // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
- // catch the exception and throw AnalysisException instead.
- try {
- if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(
- name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (
- classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(
- name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children)
- udtf.elementTypes // Force it to check input data types.
- udtf
- } else {
- throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}")
- }
- } catch {
- case analysisException: AnalysisException =>
- // If the exception is an AnalysisException, just throw it.
- throw analysisException
- case throwable: Throwable =>
- // If there is any other error, we throw an AnalysisException.
- val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
- s"because: ${throwable.getMessage}."
- throw new AnalysisException(errorMessage)
- }
- }
- }
-
- override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
- : Unit = underlying.registerFunction(name, info, builder)
-
- /* List all of the registered function names. */
- override def listFunction(): Seq[String] = {
- (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted
- }
-
- /* Get the class of the registered function by specified name. */
- override def lookupFunction(name: String): Option[ExpressionInfo] = {
- underlying.lookupFunction(name).orElse(
- Try {
- val info = getFunctionInfo(name)
- val annotation = info.getFunctionClass.getAnnotation(classOf[Description])
- if (annotation != null) {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- annotation.name(),
- annotation.value(),
- annotation.extended()))
- } else {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- name,
- null,
- null))
- }
- }.getOrElse(None))
- }
-}
-
private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {