aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-05 12:27:06 -0700
committerAndrew Or <andrew@databricks.com>2016-04-05 12:27:06 -0700
commit72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (patch)
tree8767dc0a6e575dac7cbd87ccbb5c01f504d6b851 /sql/hive
parentbc36df127d3b9f56b4edaeb5eca7697d4aef761a (diff)
downloadspark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.gz
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.bz2
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.zip
[SPARK-14123][SPARK-14384][SQL] Handle CreateFunction/DropFunction
## What changes were proposed in this pull request? This PR implements CreateFunction and DropFunction commands. Besides implementing these two commands, we also change how to manage functions. Here are the main changes. * `FunctionRegistry` will be a container to store all functions builders and it will not actively load any functions. Because of this change, we do not need to maintain a separate registry for HiveContext. So, `HiveFunctionRegistry` is deleted. * SessionCatalog takes care the job of loading a function if this function is not in the `FunctionRegistry` but its metadata is stored in the external catalog. For this case, SessionCatalog will (1) load the metadata from the external catalog, (2) load all needed resources (i.e. jars and files), (3) create a function builder based on the function definition, (4) register the function builder in the `FunctionRegistry`. * A `UnresolvedGenerator` is created. So, the parser will not need to call `FunctionRegistry` directly during parsing, which is not a good time to create a Hive UDTF. In the analysis phase, we will resolve `UnresolvedGenerator`. This PR is based on viirya's https://github.com/apache/spark/pull/12036/ ## How was this patch tested? Existing tests and new tests. ## TODOs [x] Self-review [x] Cleanup [x] More tests for create/drop functions (we need to more tests for permanent functions). [ ] File JIRAs for all TODOs [x] Standardize the error message when a function does not exist. Author: Yin Huai <yhuai@databricks.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12117 from yhuai/function.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala143
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala119
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala165
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala167
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala74
12 files changed, 574 insertions, 194 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 11205ae67c..98a5998d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -272,7 +272,12 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
- client.createFunction(db, funcDefinition)
+ // Hive's metastore is case insensitive. However, Hive's createFunction does
+ // not normalize the function name (unlike the getFunction part). So,
+ // we are normalizing the function name.
+ val functionName = funcDefinition.identifier.funcName.toLowerCase
+ val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
+ client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
override def dropFunction(db: String, name: String): Unit = withClient {
@@ -283,10 +288,6 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.renameFunction(db, oldName, newName)
}
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
- client.alterFunction(db, funcDefinition)
- }
-
override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
client.getFunction(db, funcName)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index dfbf22cc47..d315f39a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -17,27 +17,39 @@
package org.apache.spark.sql.hive
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
-class HiveSessionCatalog(
+private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
client: HiveClient,
context: HiveContext,
+ functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf)
- extends SessionCatalog(externalCatalog, functionRegistry, conf) {
+ extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
override def setCurrentDatabase(db: String): Unit = {
super.setCurrentDatabase(db)
@@ -112,4 +124,129 @@ class HiveSessionCatalog(
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
}
+ override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
+ makeFunctionBuilder(funcName, Utils.classForName(className))
+ }
+
+ /**
+ * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ */
+ private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
+ // When we instantiate hive UDF wrapper class, we may throw exception if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch, input number
+ // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
+ (children: Seq[Expression]) => {
+ try {
+ if (classOf[UDF].isAssignableFrom(clazz)) {
+ val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+ val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(
+ name,
+ new HiveFunctionWrapper(clazz.getName),
+ children,
+ isUDAFBridgeRequired = true)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+ val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udtf.elementTypes // Force it to check input data types.
+ udtf
+ } else {
+ throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
+ }
+ } catch {
+ case ae: AnalysisException =>
+ throw ae
+ case NonFatal(e) =>
+ val analysisException =
+ new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
+ analysisException.setStackTrace(e.getStackTrace)
+ throw analysisException
+ }
+ }
+ }
+
+ // We have a list of Hive built-in functions that we do not support. So, we will check
+ // Hive's function registry and lazily load needed functions into our own function registry.
+ // Those Hive built-in functions are
+ // assert_true, collect_list, collect_set, compute_stats, context_ngrams, create_union,
+ // current_user ,elt, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
+ // histogram_numeric, in_file, index, inline, java_method, map_keys, map_values,
+ // matchpath, ngrams, noop, noopstreaming, noopwithmap, noopwithmapstreaming,
+ // parse_url, parse_url_tuple, percentile, percentile_approx, posexplode, reflect, reflect2,
+ // regexp, sentences, stack, std, str_to_map, windowingtablefunction, xpath, xpath_boolean,
+ // xpath_double, xpath_float, xpath_int, xpath_long, xpath_number,
+ // xpath_short, and xpath_string.
+ override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
+ // if (super.functionExists(name)) {
+ // super.lookupFunction(name, children)
+ // } else {
+ // // This function is a Hive builtin function.
+ // ...
+ // }
+ Try(super.lookupFunction(name, children)) match {
+ case Success(expr) => expr
+ case Failure(error) =>
+ if (functionRegistry.functionExists(name)) {
+ // If the function actually exists in functionRegistry, it means that there is an
+ // error when we create the Expression using the given children.
+ // We need to throw the original exception.
+ throw error
+ } else {
+ // This function is not in functionRegistry, let's try to load it as a Hive's
+ // built-in function.
+ // Hive is case insensitive.
+ val functionName = name.toLowerCase
+ // TODO: This may not really work for current_user because current_user is not evaluated
+ // with session info.
+ // We do not need to use executionHive at here because we only load
+ // Hive's builtin functions, which do not need current db.
+ val functionInfo = {
+ try {
+ Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+ failFunctionLookup(name))
+ } catch {
+ // If HiveFunctionRegistry.getFunctionInfo throws an exception,
+ // we are failing to load a Hive builtin function, which means that
+ // the given function is not a Hive builtin function.
+ case NonFatal(e) => failFunctionLookup(name)
+ }
+ }
+ val className = functionInfo.getFunctionClass.getName
+ val builder = makeFunctionBuilder(functionName, className)
+ // Put this Hive built-in function to our function registry.
+ val info = new ExpressionInfo(className, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ // Now, we need to create the Expression.
+ functionRegistry.lookupFunction(functionName, children)
+ }
+ }
+ }
+
+ // Pre-load a few commonly used Hive built-in functions.
+ HiveSessionCatalog.preloadedHiveBuiltinFunctions.foreach {
+ case (functionName, clazz) =>
+ val builder = makeFunctionBuilder(functionName, clazz)
+ val info = new ExpressionInfo(clazz.getCanonicalName, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ }
+}
+
+private[sql] object HiveSessionCatalog {
+ // This is the list of Hive's built-in functions that are commonly used and we want to
+ // pre-load when we create the FunctionRegistry.
+ val preloadedHiveBuiltinFunctions =
+ ("collect_set", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet]) ::
+ ("collect_list", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList]) :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 829afa8432..cff24e28fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -36,25 +36,23 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
}
/**
- * Internal catalog for managing functions registered by the user.
- * Note that HiveUDFs will be overridden by functions registered in this context.
- */
- override lazy val functionRegistry: FunctionRegistry = {
- new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
- }
-
- /**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
- new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, functionRegistry, conf)
+ new HiveSessionCatalog(
+ ctx.hiveCatalog,
+ ctx.metadataHive,
+ ctx,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
}
/**
* An analyzer that uses the Hive metastore.
*/
override lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a31178e347..1f66fbfd85 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,13 +21,14 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -37,6 +38,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
@@ -611,6 +613,9 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ val resourceUris = f.resources.map { case (resourceType, resourcePath) =>
+ new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath)
+ }
new HiveFunction(
f.identifier.funcName,
db,
@@ -619,12 +624,21 @@ private[hive] class HiveClientImpl(
PrincipalType.USER,
(System.currentTimeMillis / 1000).toInt,
FunctionType.JAVA,
- List.empty[ResourceUri].asJava)
+ resourceUris.asJava)
}
private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
- new CatalogFunction(name, hf.getClassName)
+ val resources = hf.getResourceUris.asScala.map { uri =>
+ val resourceType = uri.getResourceType() match {
+ case ResourceType.ARCHIVE => "archive"
+ case ResourceType.FILE => "file"
+ case ResourceType.JAR => "jar"
+ case r => throw new AnalysisException(s"Unknown resource type: $r")
+ }
+ (resourceType, uri.getUri())
+ }
+ new CatalogFunction(name, hf.getClassName, resources)
}
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 55e69f99a4..c6c0b2ca59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -21,13 +21,13 @@ import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.parse.EximUtil
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -278,19 +278,6 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
}
/**
- * Create a [[Generator]]. Override this method in order to support custom Generators.
- */
- override protected def withGenerator(
- name: String,
- expressions: Seq[Expression],
- ctx: LateralViewContext): Generator = {
- val info = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse {
- throw new ParseException(s"Couldn't find Generator function '$name'", ctx)
- }
- HiveGenericUDTF(name, new HiveFunctionWrapper(info.getFunctionClass.getName), expressions)
- }
-
- /**
* Create a [[HiveScriptIOSchema]].
*/
override protected def withScriptIOSchema(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 5ada3d5598..784b018353 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
import org.apache.hadoop.hive.ql.exec._
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
@@ -31,130 +30,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, O
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{analysis, InternalRow}
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types._
-private[hive] class HiveFunctionRegistry(
- underlying: analysis.FunctionRegistry,
- executionHive: HiveClientImpl)
- extends analysis.FunctionRegistry with HiveInspectors {
-
- def getFunctionInfo(name: String): FunctionInfo = {
- // Hive Registry need current database to lookup function
- // TODO: the current database of executionHive should be consistent with metadataHive
- executionHive.withHiveState {
- FunctionRegistry.getFunctionInfo(name)
- }
- }
-
- override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- Try(underlying.lookupFunction(name, children)).getOrElse {
- // We only look it up to see if it exists, but do not include it in the HiveUDF since it is
- // not always serializable.
- val functionInfo: FunctionInfo =
- Option(getFunctionInfo(name.toLowerCase)).getOrElse(
- throw new AnalysisException(s"undefined function $name"))
-
- val functionClassName = functionInfo.getFunctionClass.getName
-
- // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions
- // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
- // catch the exception and throw AnalysisException instead.
- try {
- if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(
- name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (
- classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(
- name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children)
- udtf.elementTypes // Force it to check input data types.
- udtf
- } else {
- throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}")
- }
- } catch {
- case analysisException: AnalysisException =>
- // If the exception is an AnalysisException, just throw it.
- throw analysisException
- case throwable: Throwable =>
- // If there is any other error, we throw an AnalysisException.
- val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
- s"because: ${throwable.getMessage}."
- val analysisException = new AnalysisException(errorMessage)
- analysisException.setStackTrace(throwable.getStackTrace)
- throw analysisException
- }
- }
- }
-
- override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
- : Unit = underlying.registerFunction(name, info, builder)
-
- /* List all of the registered function names. */
- override def listFunction(): Seq[String] = {
- (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted
- }
-
- /* Get the class of the registered function by specified name. */
- override def lookupFunction(name: String): Option[ExpressionInfo] = {
- underlying.lookupFunction(name).orElse(
- Try {
- val info = getFunctionInfo(name)
- val annotation = info.getFunctionClass.getAnnotation(classOf[Description])
- if (annotation != null) {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- annotation.name(),
- annotation.value(),
- annotation.extended()))
- } else {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- name,
- null,
- null))
- }
- }.getOrElse(None))
- }
-
- override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = {
- underlying.lookupFunctionBuilder(name)
- }
-
- // Note: This does not drop functions stored in the metastore
- override def dropFunction(name: String): Boolean = {
- underlying.dropFunction(name)
- }
-
-}
-
private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9393302355..7f6ca21782 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -201,8 +201,13 @@ class TestHiveContext private[hive](
}
override lazy val functionRegistry = {
- new TestHiveFunctionRegistry(
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+ // We use TestHiveFunctionRegistry at here to track functions that have been explicitly
+ // unregistered (through TestHiveFunctionRegistry.unregisterFunction method).
+ val fr = new TestHiveFunctionRegistry
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach {
+ case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ }
+ fr
}
}
@@ -528,19 +533,18 @@ class TestHiveContext private[hive](
}
-private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
- extends HiveFunctionRegistry(fr, client) {
+private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
private val removedFunctions =
collection.mutable.ArrayBuffer.empty[(String, (ExpressionInfo, FunctionBuilder))]
def unregisterFunction(name: String): Unit = {
- fr.functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
+ functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
}
def restore(): Unit = {
removedFunctions.foreach {
- case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ case (name, (info, builder)) => registerFunction(name, info, builder)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 53dec6348f..dd2129375d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -32,6 +32,8 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -55,6 +57,57 @@ class HiveSparkSubmitSuite
System.setProperty("spark.testing", "true")
}
+ test("temporary Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", TemporaryHiveUDFTest.getClass.getName.stripSuffix("$"),
+ "--name", "TemporaryHiveUDFTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest1.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest1",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: use a already defined permanent function") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest2.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest2",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
test("SPARK-8368: includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
@@ -208,6 +261,118 @@ class HiveSparkSubmitSuite
}
}
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object TemporaryHiveUDFTest extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a temporary Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE TEMPORARY FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP temporary FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest1 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a permanent Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test that a pre-defined permanent function with a jar
+// resources can be used. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest2 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+ // Load a Hive UDF from the jar.
+ logInfo("Write the metadata of a permanent Hive UDF into metastore.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ val function = CatalogFunction(
+ FunctionIdentifier("example_max"),
+ "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
+ ("JAR" -> jar) :: Nil)
+ hiveContext.sessionState.catalog.createFunction(function)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
// This object is used for testing SPARK-8368: https://issues.apache.org/jira/browse/SPARK-8368.
// We test if we can load user jars in both driver and executors when HiveContext is used.
object SparkSubmitClassLoaderTest extends Logging {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index 3ab4576811..d1aa5aa931 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -17,12 +17,51 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
case class FunctionResult(f1: String, f2: String)
-class UDFSuite extends QueryTest with TestHiveSingleton {
+/**
+ * A test suite for UDF related functionalities. Because Hive metastore is
+ * case insensitive, database names and function names have both upper case
+ * letters and lower case letters.
+ */
+class UDFSuite
+ extends QueryTest
+ with SQLTestUtils
+ with TestHiveSingleton
+ with BeforeAndAfterEach {
+
+ import hiveContext.implicits._
+
+ private[this] val functionName = "myUPper"
+ private[this] val functionNameUpper = "MYUPPER"
+ private[this] val functionNameLower = "myupper"
+
+ private[this] val functionClass =
+ classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
+
+ private var testDF: DataFrame = null
+ private[this] val testTableName = "testDF_UDFSuite"
+ private var expectedDF: DataFrame = null
+
+ override def beforeAll(): Unit = {
+ sql("USE default")
+
+ testDF = (1 to 10).map(i => s"sTr$i").toDF("value")
+ testDF.registerTempTable(testTableName)
+ expectedDF = (1 to 10).map(i => s"STR$i").toDF("value")
+ super.beforeAll()
+ }
+
+ override def afterEach(): Unit = {
+ sql("USE default")
+ super.afterEach()
+ }
test("UDF case insensitive") {
hiveContext.udf.register("random0", () => { Math.random() })
@@ -32,4 +71,128 @@ class UDFSuite extends QueryTest with TestHiveSingleton {
assert(hiveContext.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
assert(hiveContext.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
}
+
+ test("temporary function: create and drop") {
+ withUserDefinedFunction(functionName -> true) {
+ intercept[AnalysisException] {
+ sql(s"CREATE TEMPORARY FUNCTION default.$functionName AS '$functionClass'")
+ }
+ sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$functionClass'")
+ checkAnswer(
+ sql(s"SELECT $functionNameLower(value) from $testTableName"),
+ expectedDF
+ )
+ intercept[AnalysisException] {
+ sql(s"DROP TEMPORARY FUNCTION default.$functionName")
+ }
+ }
+ }
+
+ test("permanent function: create and drop without specifying db name") {
+ withUserDefinedFunction(functionName -> false) {
+ sql(s"CREATE FUNCTION $functionName AS '$functionClass'")
+ checkAnswer(
+ sql("SHOW functions like '.*upper'"),
+ Row(s"default.$functionNameLower")
+ )
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+ assert(
+ sql("SHOW functions").collect()
+ .map(_.getString(0))
+ .contains(s"default.$functionNameLower"))
+ }
+ }
+
+ test("permanent function: create and drop with a db name") {
+ // For this block, drop function command uses functionName as the function name.
+ withUserDefinedFunction(functionNameUpper -> false) {
+ sql(s"CREATE FUNCTION default.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // in SessionCatalog.lookupFunction.
+ // checkAnswer(
+ // sql(s"SELECT default.myuPPer(value) from $testTableName"),
+ // expectedDF
+ // )
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+ checkAnswer(
+ sql(s"SELECT default.$functionName(value) from $testTableName"),
+ expectedDF
+ )
+ }
+
+ // For this block, drop function command uses default.functionName as the function name.
+ withUserDefinedFunction(s"DEfault.$functionNameLower" -> false) {
+ sql(s"CREATE FUNCTION dEFault.$functionName AS '$functionClass'")
+ checkAnswer(
+ sql(s"SELECT $functionNameUpper(value) from $testTableName"),
+ expectedDF
+ )
+ }
+ }
+
+ test("permanent function: create and drop a function in another db") {
+ // For this block, drop function command uses functionName as the function name.
+ withTempDatabase { dbName =>
+ withUserDefinedFunction(functionName -> false) {
+ sql(s"CREATE FUNCTION $dbName.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // checkAnswer(
+ // sql(s"SELECT $dbName.myuPPer(value) from $testTableName"),
+ // expectedDF
+ // )
+
+ checkAnswer(
+ sql(s"SHOW FUNCTIONS like $dbName.$functionNameUpper"),
+ Row(s"$dbName.$functionNameLower")
+ )
+
+ sql(s"USE $dbName")
+
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE default")
+
+ checkAnswer(
+ sql(s"SELECT $dbName.$functionName(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE $dbName")
+ }
+
+ sql(s"USE default")
+
+ // For this block, drop function command uses default.functionName as the function name.
+ withUserDefinedFunction(s"$dbName.$functionNameUpper" -> false) {
+ sql(s"CREATE FUNCTION $dbName.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // checkAnswer(
+ // sql(s"SELECT $dbName.myupper(value) from $testTableName"),
+ // expectedDF
+ // )
+
+ sql(s"USE $dbName")
+
+ assert(
+ sql("SHOW functions").collect()
+ .map(_.getString(0))
+ .contains(s"$dbName.$functionNameLower"))
+ checkAnswer(
+ sql(s"SELECT $functionNameLower(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE default")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index b951948fda..0c57ede9ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -62,7 +62,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.cacheTables = false
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
- sql("DROP TEMPORARY FUNCTION udtf_count2")
+ sql("DROP TEMPORARY FUNCTION IF EXISTS udtf_count2")
} finally {
super.afterAll()
}
@@ -1230,14 +1230,16 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
- assert(e.getMessage.contains("undefined function not_a_udf"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("not_a_udf"))
var success = false
val t = new Thread("test") {
override def run(): Unit = {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
- assert(e.getMessage.contains("undefined function not_a_udf"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("not_a_udf"))
success = true
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index b0e263dff9..d07ac56586 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -303,7 +303,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}
@@ -313,7 +313,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}
@@ -323,7 +323,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}
@@ -333,7 +333,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}
@@ -343,7 +343,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6199253d34..14a1d4cd30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
@@ -67,22 +68,43 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext.implicits._
test("UDTF") {
- sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
- // The function source code can be found at:
- // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
- sql(
- """
- |CREATE TEMPORARY FUNCTION udtf_count2
- |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- """.stripMargin)
+ withUserDefinedFunction("udtf_count2" -> true) {
+ sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
+ // The function source code can be found at:
+ // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+ sql(
+ """
+ |CREATE TEMPORARY FUNCTION udtf_count2
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ """.stripMargin)
- checkAnswer(
- sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
- Row(97, 500) :: Row(97, 500) :: Nil)
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
- checkAnswer(
- sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
- Row(3) :: Row(3) :: Nil)
+ checkAnswer(
+ sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
+ }
+
+ test("permanent UDTF") {
+ withUserDefinedFunction("udtf_count_temp" -> false) {
+ sql(
+ s"""
+ |CREATE FUNCTION udtf_count_temp
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
+
+ checkAnswer(
+ sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
}
test("SPARK-6835: udtf in lateral view") {
@@ -169,9 +191,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("show functions") {
- val allBuiltinFunctions =
- (FunctionRegistry.builtin.listFunction().toSet[String] ++
- org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames.asScala).toList.sorted
+ val allBuiltinFunctions = FunctionRegistry.builtin.listFunction().toSet[String].toList.sorted
// The TestContext is shared by all the test cases, some functions may be registered before
// this, so we check that all the builtin functions are returned.
val allFunctions = sql("SHOW functions").collect().map(r => r(0))
@@ -183,11 +203,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(sql("SHOW functions abc.abs"), Row("abs"))
checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
- checkAnswer(sql("SHOW functions `~`"), Row("~"))
+ // TODO: Re-enable this test after we fix SPARK-14335.
+ // checkAnswer(sql("SHOW functions `~`"), Row("~"))
checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil)
- checkAnswer(sql("SHOW functions `weekofyea.*`"), Row("weekofyear"))
+ checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear"))
// this probably will failed if we add more function with `sha` prefixing.
- checkAnswer(sql("SHOW functions `sha.*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
+ checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
+ // Test '|' for alternation.
+ checkAnswer(
+ sql("SHOW functions 'sha*|weekofyea*'"),
+ Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil)
}
test("describe functions") {
@@ -211,10 +236,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkExistence(sql("describe functioN abcadf"), true,
"Function: abcadf not found.")
- checkExistence(sql("describe functioN `~`"), true,
- "Function: ~",
- "Class: org.apache.hadoop.hive.ql.udf.UDFOPBitNot",
- "Usage: ~ n - Bitwise not")
+ // TODO: Re-enable this test after we fix SPARK-14335.
+ // checkExistence(sql("describe functioN `~`"), true,
+ // "Function: ~",
+ // "Class: org.apache.hadoop.hive.ql.udf.UDFOPBitNot",
+ // "Usage: ~ n - Bitwise not")
}
test("SPARK-5371: union with null and sum") {