diff options
Diffstat (limited to 'sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 107 |
1 files changed, 1 insertions, 106 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index efaa052370..784b018353 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.util.Try import org.apache.hadoop.hive.ql.exec._ import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} @@ -31,118 +30,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, O import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{analysis, InternalRow} -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.hive.HiveShim._ -import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types._ -private[hive] class HiveFunctionRegistry( - underlying: analysis.FunctionRegistry, - executionHive: HiveClientImpl) - extends analysis.FunctionRegistry with HiveInspectors { - - def getFunctionInfo(name: String): FunctionInfo = { - // Hive Registry need current database to lookup function - // TODO: the current database of executionHive should be consistent with metadataHive - executionHive.withHiveState { - FunctionRegistry.getFunctionInfo(name) - } - } - - override def lookupFunction(name: String, children: Seq[Expression]): Expression = { - Try(underlying.lookupFunction(name, children)).getOrElse { - // We only look it up to see if it exists, but do not include it in the HiveUDF since it is - // not always serializable. - val functionInfo: FunctionInfo = - Option(getFunctionInfo(name.toLowerCase)).getOrElse( - throw new AnalysisException(s"undefined function $name")) - - val functionClassName = functionInfo.getFunctionClass.getName - - // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions - // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we - // catch the exception and throw AnalysisException instead. - try { - if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) { - val udf = HiveGenericUDF( - name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children) - udf.dataType // Force it to check input data types. - udf - } else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) { - val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children) - udf.dataType // Force it to check input data types. - udf - } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) { - val udf = HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children) - udf.dataType // Force it to check input data types. - udf - } else if ( - classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) { - val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children) - udaf.dataType // Force it to check input data types. - udaf - } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) { - val udaf = HiveUDAFFunction( - name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true) - udaf.dataType // Force it to check input data types. - udaf - } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) { - val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children) - udtf.elementTypes // Force it to check input data types. - udtf - } else { - throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}") - } - } catch { - case analysisException: AnalysisException => - // If the exception is an AnalysisException, just throw it. - throw analysisException - case throwable: Throwable => - // If there is any other error, we throw an AnalysisException. - val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " + - s"because: ${throwable.getMessage}." - throw new AnalysisException(errorMessage) - } - } - } - - override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder) - : Unit = underlying.registerFunction(name, info, builder) - - /* List all of the registered function names. */ - override def listFunction(): Seq[String] = { - (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted - } - - /* Get the class of the registered function by specified name. */ - override def lookupFunction(name: String): Option[ExpressionInfo] = { - underlying.lookupFunction(name).orElse( - Try { - val info = getFunctionInfo(name) - val annotation = info.getFunctionClass.getAnnotation(classOf[Description]) - if (annotation != null) { - Some(new ExpressionInfo( - info.getFunctionClass.getCanonicalName, - annotation.name(), - annotation.value(), - annotation.extended())) - } else { - Some(new ExpressionInfo( - info.getFunctionClass.getCanonicalName, - name, - null, - null)) - } - }.getOrElse(None)) - } -} - private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression with HiveInspectors with CodegenFallback with Logging { |