aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala17
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala171
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala61
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala20
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala132
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala1
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala114
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala22
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala94
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala143
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala119
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala165
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala167
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala74
39 files changed, 1100 insertions, 513 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3e0a6d29b4..473c91e69e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -45,10 +45,7 @@ object SimpleAnalyzer
new SimpleCatalystConf(caseSensitiveAnalysis = true))
class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
- extends Analyzer(
- new SessionCatalog(new InMemoryCatalog, functionRegistry, conf),
- functionRegistry,
- conf)
+ extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
@@ -57,7 +54,6 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
*/
class Analyzer(
catalog: SessionCatalog,
- registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
@@ -756,9 +752,18 @@ class Analyzer(
case q: LogicalPlan =>
q transformExpressions {
case u if !u.childrenResolved => u // Skip until children are resolved.
+ case u @ UnresolvedGenerator(name, children) =>
+ withPosition(u) {
+ catalog.lookupFunction(name, children) match {
+ case generator: Generator => generator
+ case other =>
+ failAnalysis(s"$name is expected to be a generator. However, " +
+ s"its class is ${other.getClass.getCanonicalName}, which is not a generator.")
+ }
+ }
case u @ UnresolvedFunction(name, children, isDistinct) =>
withPosition(u) {
- registry.lookupFunction(name, children) match {
+ catalog.lookupFunction(name, children) match {
// DISTINCT is not meaningful for a Max or a Min.
case max: Max if isDistinct =>
AggregateExpression(max, Complete, isDistinct = false)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index ca8db3cbc5..7af5ffbe47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -52,6 +52,8 @@ trait FunctionRegistry {
/** Drop a function and return whether the function existed. */
def dropFunction(name: String): Boolean
+ /** Checks if a function with a given name exists. */
+ def functionExists(name: String): Boolean = lookupFunction(name).isDefined
}
class SimpleFunctionRegistry extends FunctionRegistry {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index fbbf6302e9..b2f362b6b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{errors, TableIdentifier}
+import org.apache.spark.sql.catalyst.{errors, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -133,6 +133,33 @@ object UnresolvedAttribute {
}
}
+/**
+ * Represents an unresolved generator, which will be created by the parser for
+ * the [[org.apache.spark.sql.catalyst.plans.logical.Generate]] operator.
+ * The analyzer will resolve this generator.
+ */
+case class UnresolvedGenerator(name: String, children: Seq[Expression]) extends Generator {
+
+ override def elementTypes: Seq[(DataType, Boolean, String)] =
+ throw new UnresolvedException(this, "elementTypes")
+ override def dataType: DataType = throw new UnresolvedException(this, "dataType")
+ override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
+ override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
+ override lazy val resolved = false
+
+ override def prettyName: String = name
+ override def toString: String = s"'$name(${children.mkString(", ")})"
+
+ override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
+ throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
+
+ override protected def genCode(ctx: CodegenContext, ev: ExprCode): String =
+ throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
+
+ override def terminate(): TraversableOnce[InternalRow] =
+ throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
+}
+
case class UnresolvedFunction(
name: String,
children: Seq[Expression],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 2bbb970ec9..2af0107fa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -315,11 +315,6 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).functions.put(newName, newFunc)
}
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
- requireFunctionExists(db, funcDefinition.identifier.funcName)
- catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition)
- }
-
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 3b8ce6373d..c08ffbb235 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -39,17 +39,21 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
*/
class SessionCatalog(
externalCatalog: ExternalCatalog,
+ functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
import ExternalCatalog._
- def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) {
- this(externalCatalog, functionRegistry, new SimpleCatalystConf(true))
+ def this(
+ externalCatalog: ExternalCatalog,
+ functionRegistry: FunctionRegistry,
+ conf: CatalystConf) {
+ this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
}
// For testing only.
def this(externalCatalog: ExternalCatalog) {
- this(externalCatalog, new SimpleFunctionRegistry)
+ this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true))
}
protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
@@ -439,53 +443,88 @@ class SessionCatalog(
*/
def dropFunction(name: FunctionIdentifier): Unit = {
val db = name.database.getOrElse(currentDb)
+ val qualified = name.copy(database = Some(db)).unquotedString
+ if (functionRegistry.functionExists(qualified)) {
+ // If we have loaded this function into the FunctionRegistry,
+ // also drop it from there.
+ // For a permanent function, because we loaded it to the FunctionRegistry
+ // when it's first used, we also need to drop it from the FunctionRegistry.
+ functionRegistry.dropFunction(qualified)
+ }
externalCatalog.dropFunction(db, name.funcName)
}
/**
- * Alter a metastore function whose name that matches the one specified in `funcDefinition`.
- *
- * If no database is specified in `funcDefinition`, assume the function is in the
- * current database.
- *
- * Note: If the underlying implementation does not support altering a certain field,
- * this becomes a no-op.
- */
- def alterFunction(funcDefinition: CatalogFunction): Unit = {
- val db = funcDefinition.identifier.database.getOrElse(currentDb)
- val newFuncDefinition = funcDefinition.copy(
- identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- externalCatalog.alterFunction(db, newFuncDefinition)
- }
-
- /**
* Retrieve the metadata of a metastore function.
*
* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
+ // TODO: have a better name. This method is actually for fetching the metadata of a function.
def getFunction(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
externalCatalog.getFunction(db, name.funcName)
}
+ /**
+ * Check if the specified function exists.
+ */
+ def functionExists(name: FunctionIdentifier): Boolean = {
+ if (functionRegistry.functionExists(name.unquotedString)) {
+ // This function exists in the FunctionRegistry.
+ true
+ } else {
+ // Need to check if this function exists in the metastore.
+ try {
+ // TODO: It's better to ask external catalog if this function exists.
+ // So, we can avoid of having this hacky try/catch block.
+ getFunction(name) != null
+ } catch {
+ case _: NoSuchFunctionException => false
+ case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
+ }
+ }
+ }
// ----------------------------------------------------------------
// | Methods that interact with temporary and metastore functions |
// ----------------------------------------------------------------
/**
+ * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ *
+ * This performs reflection to decide what type of [[Expression]] to return in the builder.
+ */
+ private[sql] def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
+ // TODO: at least support UDAFs here
+ throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
+ }
+
+ /**
+ * Loads resources such as JARs and Files for a function. Every resource is represented
+ * by a tuple (resource type, resource uri).
+ */
+ def loadFunctionResources(resources: Seq[(String, String)]): Unit = {
+ resources.foreach { case (resourceType, uri) =>
+ val functionResource =
+ FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri)
+ functionResourceLoader.loadResource(functionResource)
+ }
+ }
+
+ /**
* Create a temporary function.
* This assumes no database is specified in `funcDefinition`.
*/
def createTempFunction(
name: String,
+ info: ExpressionInfo,
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
throw new AnalysisException(s"Temporary function '$name' already exists.")
}
- functionRegistry.registerFunction(name, funcDefinition)
+ functionRegistry.registerFunction(name, info, funcDefinition)
}
/**
@@ -501,41 +540,59 @@ class SessionCatalog(
}
}
- /**
- * Rename a function.
- *
- * If a database is specified in `oldName`, this will rename the function in that database.
- * If no database is specified, this will first attempt to rename a temporary function with
- * the same name, then, if that does not exist, rename the function in the current database.
- *
- * This assumes the database specified in `oldName` matches the one specified in `newName`.
- */
- def renameFunction(oldName: FunctionIdentifier, newName: FunctionIdentifier): Unit = {
- if (oldName.database != newName.database) {
- throw new AnalysisException("rename does not support moving functions across databases")
- }
- val db = oldName.database.getOrElse(currentDb)
- val oldBuilder = functionRegistry.lookupFunctionBuilder(oldName.funcName)
- if (oldName.database.isDefined || oldBuilder.isEmpty) {
- externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
- } else {
- val oldExpressionInfo = functionRegistry.lookupFunction(oldName.funcName).get
- val newExpressionInfo = new ExpressionInfo(
- oldExpressionInfo.getClassName,
- newName.funcName,
- oldExpressionInfo.getUsage,
- oldExpressionInfo.getExtended)
- functionRegistry.dropFunction(oldName.funcName)
- functionRegistry.registerFunction(newName.funcName, newExpressionInfo, oldBuilder.get)
- }
+ protected def failFunctionLookup(name: String): Nothing = {
+ throw new AnalysisException(s"Undefined function: $name. This function is " +
+ s"neither a registered temporary function nor " +
+ s"a permanent function registered in the database $currentDb.")
}
/**
* Return an [[Expression]] that represents the specified function, assuming it exists.
- * Note: This is currently only used for temporary functions.
+ *
+ * For a temporary function or a permanent function that has been loaded,
+ * this method will simply lookup the function through the
+ * FunctionRegistry and create an expression based on the builder.
+ *
+ * For a permanent function that has not been loaded, we will first fetch its metadata
+ * from the underlying external catalog. Then, we will load all resources associated
+ * with this function (i.e. jars and files). Finally, we create a function builder
+ * based on the function class and put the builder into the FunctionRegistry.
+ * The name of this function in the FunctionRegistry will be `databaseName.functionName`.
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- functionRegistry.lookupFunction(name, children)
+ // TODO: Right now, the name can be qualified or not qualified.
+ // It will be better to get a FunctionIdentifier.
+ // TODO: Right now, we assume that name is not qualified!
+ val qualifiedName = FunctionIdentifier(name, Some(currentDb)).unquotedString
+ if (functionRegistry.functionExists(name)) {
+ // This function has been already loaded into the function registry.
+ functionRegistry.lookupFunction(name, children)
+ } else if (functionRegistry.functionExists(qualifiedName)) {
+ // This function has been already loaded into the function registry.
+ // Unlike the above block, we find this function by using the qualified name.
+ functionRegistry.lookupFunction(qualifiedName, children)
+ } else {
+ // The function has not been loaded to the function registry, which means
+ // that the function is a permanent function (if it actually has been registered
+ // in the metastore). We need to first put the function in the FunctionRegistry.
+ val catalogFunction = try {
+ externalCatalog.getFunction(currentDb, name)
+ } catch {
+ case e: AnalysisException => failFunctionLookup(name)
+ case e: NoSuchFunctionException => failFunctionLookup(name)
+ }
+ loadFunctionResources(catalogFunction.resources)
+ // Please note that qualifiedName is provided by the user. However,
+ // catalogFunction.identifier.unquotedString is returned by the underlying
+ // catalog. So, it is possible that qualifiedName is not exactly the same as
+ // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
+ // At here, we preserve the input from the user.
+ val info = new ExpressionInfo(catalogFunction.className, qualifiedName)
+ val builder = makeFunctionBuilder(qualifiedName, catalogFunction.className)
+ createTempFunction(qualifiedName, info, builder, ignoreIfExists = false)
+ // Now, we need to create the Expression.
+ functionRegistry.lookupFunction(qualifiedName, children)
+ }
}
/**
@@ -545,17 +602,11 @@ class SessionCatalog(
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
- val _tempFunctions = functionRegistry.listFunction()
+ val loadedFunctions = functionRegistry.listFunction()
.filter { f => regex.pattern.matcher(f).matches() }
.map { f => FunctionIdentifier(f) }
- dbFunctions ++ _tempFunctions
- }
-
- /**
- * Return a temporary function. For testing only.
- */
- private[catalog] def getTempFunction(name: String): Option[FunctionBuilder] = {
- functionRegistry.lookupFunctionBuilder(name)
+ // TODO: Actually, there will be dbFunctions that have been loaded into the FunctionRegistry.
+ // So, the returned list may have two entries for the same function.
+ dbFunctions ++ loadedFunctions
}
-
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
new file mode 100644
index 0000000000..5adcc892cf
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.sql.AnalysisException
+
+/** An trait that represents the type of a resourced needed by a function. */
+sealed trait FunctionResourceType
+
+object JarResource extends FunctionResourceType
+
+object FileResource extends FunctionResourceType
+
+// We do not allow users to specify a archive because it is YARN specific.
+// When loading resources, we will throw an exception and ask users to
+// use --archive with spark submit.
+object ArchiveResource extends FunctionResourceType
+
+object FunctionResourceType {
+ def fromString(resourceType: String): FunctionResourceType = {
+ resourceType.toLowerCase match {
+ case "jar" => JarResource
+ case "file" => FileResource
+ case "archive" => ArchiveResource
+ case other =>
+ throw new AnalysisException(s"Resource Type '$resourceType' is not supported.")
+ }
+ }
+}
+
+case class FunctionResource(resourceType: FunctionResourceType, uri: String)
+
+/**
+ * A simple trait representing a class that can be used to load resources used by
+ * a function. Because only a SQLContext can load resources, we create this trait
+ * to avoid of explicitly passing SQLContext around.
+ */
+trait FunctionResourceLoader {
+ def loadResource(resource: FunctionResource): Unit
+}
+
+object DummyFunctionResourceLoader extends FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ throw new UnsupportedOperationException
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 303846d313..97b9946140 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -150,15 +150,6 @@ abstract class ExternalCatalog {
def renameFunction(db: String, oldName: String, newName: String): Unit
- /**
- * Alter a function whose name that matches the one specified in `funcDefinition`,
- * assuming the function exists.
- *
- * Note: If the underlying implementation does not support altering a certain field,
- * this becomes a no-op.
- */
- def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
-
def getFunction(db: String, funcName: String): CatalogFunction
def listFunctions(db: String, pattern: String): Seq[String]
@@ -171,8 +162,13 @@ abstract class ExternalCatalog {
*
* @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
+ * @param resources resource types and Uris used by the function
*/
-case class CatalogFunction(identifier: FunctionIdentifier, className: String)
+// TODO: Use FunctionResource instead of (String, String) as the element type of resources.
+case class CatalogFunction(
+ identifier: FunctionIdentifier,
+ className: String,
+ resources: Seq[(String, String)])
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 87f4d1b007..aae75956ea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -25,10 +25,10 @@ package org.apache.spark.sql.catalyst
* Format (quoted): "`name`" or "`db`.`name`"
*/
sealed trait IdentifierWithDatabase {
- val name: String
+ val identifier: String
def database: Option[String]
- def quotedString: String = database.map(db => s"`$db`.`$name`").getOrElse(s"`$name`")
- def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
+ def quotedString: String = database.map(db => s"`$db`.`$identifier`").getOrElse(s"`$identifier`")
+ def unquotedString: String = database.map(db => s"$db.$identifier").getOrElse(identifier)
override def toString: String = quotedString
}
@@ -36,13 +36,15 @@ sealed trait IdentifierWithDatabase {
/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
+ * When we register a permenent function in the FunctionRegistry, we use
+ * unquotedString as the function name.
*/
case class TableIdentifier(table: String, database: Option[String])
extends IdentifierWithDatabase {
- override val name: String = table
+ override val identifier: String = table
- def this(name: String) = this(name, None)
+ def this(table: String) = this(table, None)
}
@@ -58,9 +60,9 @@ object TableIdentifier {
case class FunctionIdentifier(funcName: String, database: Option[String])
extends IdentifierWithDatabase {
- override val name: String = funcName
+ override val identifier: String = funcName
- def this(name: String) = this(name, None)
+ def this(funcName: String) = this(funcName, None)
}
object FunctionIdentifier {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 14c90918e6..5a3aebff09 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -549,8 +549,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
Explode(expressions.head)
case "json_tuple" =>
JsonTuple(expressions)
- case other =>
- withGenerator(other, expressions, ctx)
+ case name =>
+ UnresolvedGenerator(name, expressions)
}
Generate(
@@ -563,16 +563,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
/**
- * Create a [[Generator]]. Override this method in order to support custom Generators.
- */
- protected def withGenerator(
- name: String,
- expressions: Seq[Expression],
- ctx: LateralViewContext): Generator = {
- throw new ParseException(s"Generator function '$name' is not supported", ctx)
- }
-
- /**
* Create a joins between two or more logical plans.
*/
override def visitJoinRelation(ctx: JoinRelationContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 3ec95ef5b5..b1fcf011f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -32,7 +32,7 @@ trait AnalysisTest extends PlanTest {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
- new Analyzer(catalog, EmptyFunctionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 1a350bf847..b3b1f5b920 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types._
class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
private val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
- private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+ private val analyzer = new Analyzer(catalog, conf)
private val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 959bd564d9..fbcac09ce2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -433,7 +433,8 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
test("get function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1") ==
- CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
+ CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+ Seq.empty[(String, String)]))
intercept[AnalysisException] {
catalog.getFunction("db2", "does_not_exist")
}
@@ -464,21 +465,6 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
}
}
- test("alter function") {
- val catalog = newBasicCatalog()
- assert(catalog.getFunction("db2", "func1").className == funcClass)
- catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
- assert(catalog.getFunction("db2", "func1").className == "muhaha")
- intercept[AnalysisException] { catalog.alterFunction("db2", newFunc("funcky")) }
- }
-
- test("alter function when database does not exist") {
- val catalog = newBasicCatalog()
- intercept[AnalysisException] {
- catalog.alterFunction("does_not_exist", newFunc())
- }
- }
-
test("list functions") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2"))
@@ -557,7 +543,7 @@ abstract class CatalogTestUtils {
}
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
- CatalogFunction(FunctionIdentifier(name, database), funcClass)
+ CatalogFunction(FunctionIdentifier(name, database), funcClass, Seq.empty[(String, String)])
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index acd97592b6..4d56d001b3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
@@ -685,19 +685,26 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
- catalog.createTempFunction("temp1", tempFunc1, ignoreIfExists = false)
- catalog.createTempFunction("temp2", tempFunc2, ignoreIfExists = false)
- assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
- assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
- assert(catalog.getTempFunction("temp3") == None)
+ val info1 = new ExpressionInfo("tempFunc1", "temp1")
+ val info2 = new ExpressionInfo("tempFunc2", "temp2")
+ catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false)
+ catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false)
+ val arguments = Seq(Literal(1), Literal(2), Literal(3))
+ assert(catalog.lookupFunction("temp1", arguments) === Literal(1))
+ assert(catalog.lookupFunction("temp2", arguments) === Literal(3))
+ // Temporary function does not exist.
+ intercept[AnalysisException] {
+ catalog.lookupFunction("temp3", arguments)
+ }
val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
+ val info3 = new ExpressionInfo("tempFunc3", "temp1")
// Temporary function already exists
intercept[AnalysisException] {
- catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = false)
+ catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false)
}
// Temporary function is overridden
- catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = true)
- assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
+ catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true)
+ assert(catalog.lookupFunction("temp1", arguments) === Literal(arguments.length))
}
test("drop function") {
@@ -726,11 +733,15 @@ class SessionCatalogSuite extends SparkFunSuite {
test("drop temp function") {
val catalog = new SessionCatalog(newBasicCatalog())
+ val info = new ExpressionInfo("tempFunc", "func1")
val tempFunc = (e: Seq[Expression]) => e.head
- catalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
- assert(catalog.getTempFunction("func1") == Some(tempFunc))
+ catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
+ val arguments = Seq(Literal(1), Literal(2), Literal(3))
+ assert(catalog.lookupFunction("func1", arguments) === Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
- assert(catalog.getTempFunction("func1") == None)
+ intercept[AnalysisException] {
+ catalog.lookupFunction("func1", arguments)
+ }
intercept[AnalysisException] {
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
}
@@ -739,7 +750,9 @@ class SessionCatalogSuite extends SparkFunSuite {
test("get function") {
val catalog = new SessionCatalog(newBasicCatalog())
- val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass)
+ val expected =
+ CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+ Seq.empty[(String, String)])
assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected)
// Get function without explicitly specifying database
catalog.setCurrentDatabase("db2")
@@ -758,8 +771,9 @@ class SessionCatalogSuite extends SparkFunSuite {
test("lookup temp function") {
val catalog = new SessionCatalog(newBasicCatalog())
+ val info1 = new ExpressionInfo("tempFunc1", "func1")
val tempFunc1 = (e: Seq[Expression]) => e.head
- catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
+ catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
assert(catalog.lookupFunction("func1", Seq(Literal(1), Literal(2), Literal(3))) == Literal(1))
catalog.dropTempFunction("func1", ignoreIfNotExists = false)
intercept[AnalysisException] {
@@ -767,98 +781,16 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
- test("rename function") {
- val externalCatalog = newBasicCatalog()
- val sessionCatalog = new SessionCatalog(externalCatalog)
- val newName = "funcky"
- assert(sessionCatalog.getFunction(
- FunctionIdentifier("func1", Some("db2"))) == newFunc("func1", Some("db2")))
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
- sessionCatalog.renameFunction(
- FunctionIdentifier("func1", Some("db2")), FunctionIdentifier(newName, Some("db2")))
- assert(sessionCatalog.getFunction(
- FunctionIdentifier(newName, Some("db2"))) == newFunc(newName, Some("db2")))
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set(newName))
- // Rename function without explicitly specifying database
- sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.renameFunction(FunctionIdentifier(newName), FunctionIdentifier("func1"))
- assert(sessionCatalog.getFunction(
- FunctionIdentifier("func1")) == newFunc("func1", Some("db2")))
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
- // Renaming "db2.func1" to "db1.func2" should fail because databases don't match
- intercept[AnalysisException] {
- sessionCatalog.renameFunction(
- FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func2", Some("db1")))
- }
- }
-
- test("rename function when database/function does not exist") {
- val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.renameFunction(
- FunctionIdentifier("func1", Some("does_not_exist")),
- FunctionIdentifier("func5", Some("does_not_exist")))
- }
- intercept[AnalysisException] {
- catalog.renameFunction(
- FunctionIdentifier("does_not_exist", Some("db2")),
- FunctionIdentifier("x", Some("db2")))
- }
- }
-
- test("rename temp function") {
- val externalCatalog = newBasicCatalog()
- val sessionCatalog = new SessionCatalog(externalCatalog)
- val tempFunc = (e: Seq[Expression]) => e.head
- sessionCatalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
- sessionCatalog.setCurrentDatabase("db2")
- // If a database is specified, we'll always rename the function in that database
- sessionCatalog.renameFunction(
- FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func3", Some("db2")))
- assert(sessionCatalog.getTempFunction("func1") == Some(tempFunc))
- assert(sessionCatalog.getTempFunction("func3") == None)
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3"))
- // If no database is specified, we'll first rename temporary functions
- sessionCatalog.createFunction(newFunc("func1", Some("db2")))
- sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4"))
- assert(sessionCatalog.getTempFunction("func4") == Some(tempFunc))
- assert(sessionCatalog.getTempFunction("func1") == None)
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3"))
- // Then, if no such temporary function exist, rename the function in the current database
- sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func5"))
- assert(sessionCatalog.getTempFunction("func5") == None)
- assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3", "func5"))
- }
-
- test("alter function") {
- val catalog = new SessionCatalog(newBasicCatalog())
- assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == funcClass)
- catalog.alterFunction(newFunc("func1", Some("db2")).copy(className = "muhaha"))
- assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == "muhaha")
- // Alter function without explicitly specifying database
- catalog.setCurrentDatabase("db2")
- catalog.alterFunction(newFunc("func1").copy(className = "derpy"))
- assert(catalog.getFunction(FunctionIdentifier("func1")).className == "derpy")
- }
-
- test("alter function when database/function does not exist") {
- val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.alterFunction(newFunc("func5", Some("does_not_exist")))
- }
- intercept[AnalysisException] {
- catalog.alterFunction(newFunc("funcky", Some("db2")))
- }
- }
-
test("list functions") {
val catalog = new SessionCatalog(newBasicCatalog())
+ val info1 = new ExpressionInfo("tempFunc1", "func1")
+ val info2 = new ExpressionInfo("tempFunc2", "yes_me")
val tempFunc1 = (e: Seq[Expression]) => e.head
val tempFunc2 = (e: Seq[Expression]) => e.last
catalog.createFunction(newFunc("func2", Some("db2")))
catalog.createFunction(newFunc("not_me", Some("db2")))
- catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
- catalog.createTempFunction("yes_me", tempFunc2, ignoreIfExists = false)
+ catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false)
+ catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false)
assert(catalog.listFunctions("db1", "*").toSet ==
Set(FunctionIdentifier("func1"),
FunctionIdentifier("yes_me")))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index dd6b5cac28..8147d06969 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -141,7 +141,6 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
private val caseInsensitiveConf = new SimpleCatalystConf(false)
private val caseInsensitiveAnalyzer = new Analyzer(
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, caseInsensitiveConf),
- EmptyFunctionRegistry,
caseInsensitiveConf)
test("(a && b) || (a && c) => a && (b || c) when case insensitive") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 009889d5a1..8c92ad82ac 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules._
class EliminateSortsSuite extends PlanTest {
val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
- val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+ val analyzer = new Analyzer(catalog, conf)
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 9e1660df06..262537d9c7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.parser
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -296,10 +297,18 @@ class PlanParserSuite extends PlanTest {
.insertInto("t2"),
from.where('s < 10).select(star()).insertInto("t3")))
- // Unsupported generator.
- intercept(
+ // Unresolved generator.
+ val expected = table("t")
+ .generate(
+ UnresolvedGenerator("posexplode", Seq('x)),
+ join = true,
+ outer = false,
+ Some("posexpl"),
+ Seq("x", "y"))
+ .select(star())
+ assertEqual(
"select * from t lateral view posexplode(x) posexpl as x, y",
- "Generator function 'posexplode' is not supported")
+ expected)
}
test("joins") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d4290fee0a..587ba1ea05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
@@ -208,6 +208,22 @@ class SQLContext private[sql](
sparkContext.addJar(path)
}
+ /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
+ @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = {
+ new FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ resource.resourceType match {
+ case JarResource => addJar(resource.uri)
+ case FileResource => sparkContext.addFile(resource.uri)
+ case ArchiveResource =>
+ throw new AnalysisException(
+ "Archive is not allowed to be loaded. If YARN mode is used, " +
+ "please use --archives options while calling spark-submit.")
+ }
+ }
+ }
+ }
+
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fb106d1aef..382cc61fac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -337,10 +337,9 @@ class SparkSqlAstBuilder extends AstBuilder {
CreateFunction(
database,
function,
- string(ctx.className), // TODO this is not an alias.
+ string(ctx.className),
resources,
- ctx.TEMPORARY != null)(
- command(ctx))
+ ctx.TEMPORARY != null)
}
/**
@@ -353,7 +352,7 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
val (database, function) = visitFunctionName(ctx.qualifiedName)
- DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
+ DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index a4be3bc333..faa7a2cdb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -426,8 +426,12 @@ case class ShowTablePropertiesCommand(
* A command for users to list all of the registered functions.
* The syntax of using this command in SQL is:
* {{{
- * SHOW FUNCTIONS
+ * SHOW FUNCTIONS [LIKE pattern]
* }}}
+ * For the pattern, '*' matches any sequence of characters (including no characters) and
+ * '|' is for alternation.
+ * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
+ *
* TODO currently we are simply ignore the db
*/
case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
@@ -438,18 +442,17 @@ case class ShowFunctions(db: Option[String], pattern: Option[String]) extends Ru
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = pattern match {
- case Some(p) =>
- try {
- val regex = java.util.regex.Pattern.compile(p)
- sqlContext.sessionState.functionRegistry.listFunction()
- .filter(regex.matcher(_).matches()).map(Row(_))
- } catch {
- // probably will failed in the regex that user provided, then returns empty row.
- case _: Throwable => Seq.empty[Row]
- }
- case None =>
- sqlContext.sessionState.functionRegistry.listFunction().map(Row(_))
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ // If pattern is not specified, we use '*', which is used to
+ // match any sequence of characters (including no characters).
+ val functionNames =
+ sqlContext.sessionState.catalog
+ .listFunctions(dbName, pattern.getOrElse("*"))
+ .map(_.unquotedString)
+ // The session catalog caches some persistent functions in the FunctionRegistry
+ // so there can be duplicates.
+ functionNames.distinct.sorted.map(Row(_))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index cd7e0eed65..6896881910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -175,26 +175,6 @@ case class DescribeDatabase(
}
}
-case class CreateFunction(
- databaseName: Option[String],
- functionName: String,
- alias: String,
- resources: Seq[(String, String)],
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-/**
- * The DDL command that drops a function.
- * ifExists: returns an error if the function doesn't exist, unless this is true.
- * isTemp: indicates if it is a temporary function.
- */
-case class DropFunction(
- databaseName: Option[String],
- functionName: String,
- ifExists: Boolean,
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
case class AlterTableRename(
oldName: TableIdentifier,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
new file mode 100644
index 0000000000..66d17e322e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+
+
+/**
+ * The DDL command that creates a function.
+ * To create a temporary function, the syntax of using this command in SQL is:
+ * {{{
+ * CREATE TEMPORARY FUNCTION functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ *
+ * To create a permanent function, the syntax in SQL is:
+ * {{{
+ * CREATE FUNCTION [databaseName.]functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ */
+// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources.
+case class CreateFunction(
+ databaseName: Option[String],
+ functionName: String,
+ className: String,
+ resources: Seq[(String, String)],
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when defining a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ // We first load resources and then put the builder in the function registry.
+ // Please note that it is allowed to overwrite an existing temp function.
+ sqlContext.sessionState.catalog.loadFunctionResources(resources)
+ val info = new ExpressionInfo(className, functionName)
+ val builder =
+ sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
+ sqlContext.sessionState.catalog.createTempFunction(
+ functionName, info, builder, ignoreIfExists = false)
+ } else {
+ // For a permanent, we will store the metadata into underlying external catalog.
+ // This function will be loaded into the FunctionRegistry when a query uses it.
+ // We do not load it into FunctionRegistry right now.
+ val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ val catalogFunc = CatalogFunction(func, className, resources)
+ if (sqlContext.sessionState.catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' already exists in database '$dbName'.")
+ }
+ sqlContext.sessionState.catalog.createFunction(catalogFunc)
+ }
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * The DDL command that drops a function.
+ * ifExists: returns an error if the function doesn't exist, unless this is true.
+ * isTemp: indicates if it is a temporary function.
+ */
+case class DropFunction(
+ databaseName: Option[String],
+ functionName: String,
+ ifExists: Boolean,
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when dropping a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ catalog.dropTempFunction(functionName, ifExists)
+ } else {
+ // We are dropping a permanent function.
+ val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ if (!ifExists && !catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' does not exist in database '$dbName'.")
+ }
+ catalog.dropFunction(func)
+ }
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cd29def3be..69e3358d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -51,7 +51,12 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Internal catalog for managing table and database states.
*/
- lazy val catalog = new SessionCatalog(ctx.externalCatalog, functionRegistry, conf)
+ lazy val catalog =
+ new SessionCatalog(
+ ctx.externalCatalog,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
/**
* Interface exposed to the user for registering user-defined functions.
@@ -62,7 +67,7 @@ private[sql] class SessionState(ctx: SQLContext) {
* Logical query plan analyzer for resolving unresolved attributes and relations.
*/
lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
DataSourceAnalysis ::
@@ -98,5 +103,5 @@ private[sql] class SessionState(ctx: SQLContext) {
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
-
}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b727e88668..5a851b47ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -61,8 +61,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.filter(regex.matcher(_).matches()).map(Row(_))
}
checkAnswer(sql("SHOW functions"), getFunctions(".*"))
- Seq("^c.*", ".*e$", "log.*", ".*date.*").foreach { pattern =>
- checkAnswer(sql(s"SHOW FUNCTIONS '$pattern'"), getFunctions(pattern))
+ Seq("^c*", "*e$", "log*", "*date*").foreach { pattern =>
+ // For the pattern part, only '*' and '|' are allowed as wildcards.
+ // For '*', we need to replace it to '.*'.
+ checkAnswer(
+ sql(s"SHOW FUNCTIONS '$pattern'"),
+ getFunctions(pattern.replaceAll("\\*", ".*")))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index fd736718af..ec950332c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -83,7 +83,8 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val e = intercept[AnalysisException] {
df.selectExpr("a_function_that_does_not_exist()")
}
- assert(e.getMessage.contains("undefined function"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("a_function_that_does_not_exist"))
}
test("Simple UDF") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 47e295a7e7..c42e8e7233 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -147,13 +147,13 @@ class DDLCommandSuite extends PlanTest {
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = CreateFunction(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
- isTemp = false)(sql2)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
@@ -173,22 +173,22 @@ class DDLCommandSuite extends PlanTest {
None,
"helloworld",
ifExists = false,
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = DropFunction(
None,
"helloworld",
ifExists = true,
- isTemp = true)(sql2)
+ isTemp = true)
val expected3 = DropFunction(
Some("hello"),
"world",
ifExists = false,
- isTemp = false)(sql3)
+ isTemp = false)
val expected4 = DropFunction(
Some("hello"),
"world",
ifExists = true,
- isTemp = false)(sql4)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 80a85a6615..7844d1b296 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.Filter
@@ -132,6 +133,27 @@ private[sql] trait SQLTestUtils
}
/**
+ * Drops functions after calling `f`. A function is represented by (functionName, isTemporary).
+ */
+ protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = {
+ try {
+ f
+ } catch {
+ case cause: Throwable => throw cause
+ } finally {
+ // If the test failed part way, we don't want to mask the failure by failing to remove
+ // temp tables that never got created.
+ try functions.foreach { case (functionName, isTemporary) =>
+ val withTemporary = if (isTemporary) "TEMPORARY" else ""
+ sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName")
+ assert(
+ !sqlContext.sessionState.catalog.functionExists(FunctionIdentifier(functionName)),
+ s"Function $functionName should have been dropped. But, it still exists.")
+ }
+ }
+ }
+
+ /**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 2c7358e59a..a1268b8e94 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -491,46 +491,50 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
test("SPARK-11595 ADD JAR with input path having URL scheme") {
withJdbcStatement { statement =>
- val jarPath = "../hive/src/test/resources/TestUDTF.jar"
- val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
+ try {
+ val jarPath = "../hive/src/test/resources/TestUDTF.jar"
+ val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
- Seq(
- s"ADD JAR $jarURL",
- s"""CREATE TEMPORARY FUNCTION udtf_count2
- |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- """.stripMargin
- ).foreach(statement.execute)
+ Seq(
+ s"ADD JAR $jarURL",
+ s"""CREATE TEMPORARY FUNCTION udtf_count2
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ """.stripMargin
+ ).foreach(statement.execute)
- val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+ val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
- assert(rs1.next())
- assert(rs1.getString(1) === "Function: udtf_count2")
+ assert(rs1.next())
+ assert(rs1.getString(1) === "Function: udtf_count2")
- assert(rs1.next())
- assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
- rs1.getString(1)
- }
+ assert(rs1.next())
+ assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+ rs1.getString(1)
+ }
- assert(rs1.next())
- assert(rs1.getString(1) === "Usage: To be added.")
+ assert(rs1.next())
+ assert(rs1.getString(1) === "Usage: To be added.")
- val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
+ val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
- Seq(
- s"CREATE TABLE test_udtf(key INT, value STRING)",
- s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
- ).foreach(statement.execute)
+ Seq(
+ s"CREATE TABLE test_udtf(key INT, value STRING)",
+ s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
+ ).foreach(statement.execute)
- val rs2 = statement.executeQuery(
- "SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS cc")
+ val rs2 = statement.executeQuery(
+ "SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS cc")
- assert(rs2.next())
- assert(rs2.getInt(1) === 97)
- assert(rs2.getInt(2) === 500)
+ assert(rs2.next())
+ assert(rs2.getInt(1) === 97)
+ assert(rs2.getInt(2) === 500)
- assert(rs2.next())
- assert(rs2.getInt(1) === 97)
- assert(rs2.getInt(2) === 500)
+ assert(rs2.next())
+ assert(rs2.getInt(1) === 97)
+ assert(rs2.getInt(2) === 500)
+ } finally {
+ statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
+ }
}
}
@@ -565,24 +569,28 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
},
{ statement =>
- val rs1 = statement.executeQuery("SET foo")
+ try {
+ val rs1 = statement.executeQuery("SET foo")
- assert(rs1.next())
- assert(rs1.getString(1) === "foo")
- assert(rs1.getString(2) === "bar")
+ assert(rs1.next())
+ assert(rs1.getString(1) === "foo")
+ assert(rs1.getString(2) === "bar")
- val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+ val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
- assert(rs2.next())
- assert(rs2.getString(1) === "Function: udtf_count2")
+ assert(rs2.next())
+ assert(rs2.getString(1) === "Function: udtf_count2")
- assert(rs2.next())
- assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
- rs2.getString(1)
- }
+ assert(rs2.next())
+ assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+ rs2.getString(1)
+ }
- assert(rs2.next())
- assert(rs2.getString(1) === "Usage: To be added.")
+ assert(rs2.next())
+ assert(rs2.getString(1) === "Usage: To be added.")
+ } finally {
+ statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
+ }
}
)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 11205ae67c..98a5998d03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -272,7 +272,12 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
override def createFunction(
db: String,
funcDefinition: CatalogFunction): Unit = withClient {
- client.createFunction(db, funcDefinition)
+ // Hive's metastore is case insensitive. However, Hive's createFunction does
+ // not normalize the function name (unlike the getFunction part). So,
+ // we are normalizing the function name.
+ val functionName = funcDefinition.identifier.funcName.toLowerCase
+ val functionIdentifier = funcDefinition.identifier.copy(funcName = functionName)
+ client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
}
override def dropFunction(db: String, name: String): Unit = withClient {
@@ -283,10 +288,6 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
client.renameFunction(db, oldName, newName)
}
- override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = withClient {
- client.alterFunction(db, funcDefinition)
- }
-
override def getFunction(db: String, funcName: String): CatalogFunction = withClient {
client.getFunction(db, funcName)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index dfbf22cc47..d315f39a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -17,27 +17,39 @@
package org.apache.spark.sql.hive
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
-class HiveSessionCatalog(
+private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
client: HiveClient,
context: HiveContext,
+ functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf)
- extends SessionCatalog(externalCatalog, functionRegistry, conf) {
+ extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
override def setCurrentDatabase(db: String): Unit = {
super.setCurrentDatabase(db)
@@ -112,4 +124,129 @@ class HiveSessionCatalog(
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
}
+ override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
+ makeFunctionBuilder(funcName, Utils.classForName(className))
+ }
+
+ /**
+ * Construct a [[FunctionBuilder]] based on the provided class that represents a function.
+ */
+ private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = {
+ // When we instantiate hive UDF wrapper class, we may throw exception if the input
+ // expressions don't satisfy the hive UDF, such as type mismatch, input number
+ // mismatch, etc. Here we catch the exception and throw AnalysisException instead.
+ (children: Seq[Expression]) => {
+ try {
+ if (classOf[UDF].isAssignableFrom(clazz)) {
+ val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+ val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udf.dataType // Force it to check input data types.
+ udf
+ } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+ val udaf = HiveUDAFFunction(
+ name,
+ new HiveFunctionWrapper(clazz.getName),
+ children,
+ isUDAFBridgeRequired = true)
+ udaf.dataType // Force it to check input data types.
+ udaf
+ } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+ val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children)
+ udtf.elementTypes // Force it to check input data types.
+ udtf
+ } else {
+ throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'")
+ }
+ } catch {
+ case ae: AnalysisException =>
+ throw ae
+ case NonFatal(e) =>
+ val analysisException =
+ new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e")
+ analysisException.setStackTrace(e.getStackTrace)
+ throw analysisException
+ }
+ }
+ }
+
+ // We have a list of Hive built-in functions that we do not support. So, we will check
+ // Hive's function registry and lazily load needed functions into our own function registry.
+ // Those Hive built-in functions are
+ // assert_true, collect_list, collect_set, compute_stats, context_ngrams, create_union,
+ // current_user ,elt, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, ewah_bitmap_or, field,
+ // histogram_numeric, in_file, index, inline, java_method, map_keys, map_values,
+ // matchpath, ngrams, noop, noopstreaming, noopwithmap, noopwithmapstreaming,
+ // parse_url, parse_url_tuple, percentile, percentile_approx, posexplode, reflect, reflect2,
+ // regexp, sentences, stack, std, str_to_map, windowingtablefunction, xpath, xpath_boolean,
+ // xpath_double, xpath_float, xpath_int, xpath_long, xpath_number,
+ // xpath_short, and xpath_string.
+ override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to
+ // if (super.functionExists(name)) {
+ // super.lookupFunction(name, children)
+ // } else {
+ // // This function is a Hive builtin function.
+ // ...
+ // }
+ Try(super.lookupFunction(name, children)) match {
+ case Success(expr) => expr
+ case Failure(error) =>
+ if (functionRegistry.functionExists(name)) {
+ // If the function actually exists in functionRegistry, it means that there is an
+ // error when we create the Expression using the given children.
+ // We need to throw the original exception.
+ throw error
+ } else {
+ // This function is not in functionRegistry, let's try to load it as a Hive's
+ // built-in function.
+ // Hive is case insensitive.
+ val functionName = name.toLowerCase
+ // TODO: This may not really work for current_user because current_user is not evaluated
+ // with session info.
+ // We do not need to use executionHive at here because we only load
+ // Hive's builtin functions, which do not need current db.
+ val functionInfo = {
+ try {
+ Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+ failFunctionLookup(name))
+ } catch {
+ // If HiveFunctionRegistry.getFunctionInfo throws an exception,
+ // we are failing to load a Hive builtin function, which means that
+ // the given function is not a Hive builtin function.
+ case NonFatal(e) => failFunctionLookup(name)
+ }
+ }
+ val className = functionInfo.getFunctionClass.getName
+ val builder = makeFunctionBuilder(functionName, className)
+ // Put this Hive built-in function to our function registry.
+ val info = new ExpressionInfo(className, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ // Now, we need to create the Expression.
+ functionRegistry.lookupFunction(functionName, children)
+ }
+ }
+ }
+
+ // Pre-load a few commonly used Hive built-in functions.
+ HiveSessionCatalog.preloadedHiveBuiltinFunctions.foreach {
+ case (functionName, clazz) =>
+ val builder = makeFunctionBuilder(functionName, clazz)
+ val info = new ExpressionInfo(clazz.getCanonicalName, functionName)
+ createTempFunction(functionName, info, builder, ignoreIfExists = false)
+ }
+}
+
+private[sql] object HiveSessionCatalog {
+ // This is the list of Hive's built-in functions that are commonly used and we want to
+ // pre-load when we create the FunctionRegistry.
+ val preloadedHiveBuiltinFunctions =
+ ("collect_set", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet]) ::
+ ("collect_list", classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList]) :: Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 829afa8432..cff24e28fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -36,25 +36,23 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
}
/**
- * Internal catalog for managing functions registered by the user.
- * Note that HiveUDFs will be overridden by functions registered in this context.
- */
- override lazy val functionRegistry: FunctionRegistry = {
- new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
- }
-
- /**
* Internal catalog for managing table and database states.
*/
override lazy val catalog = {
- new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, functionRegistry, conf)
+ new HiveSessionCatalog(
+ ctx.hiveCatalog,
+ ctx.metadataHive,
+ ctx,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
}
/**
* An analyzer that uses the Hive metastore.
*/
override lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a31178e347..1f66fbfd85 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,13 +21,14 @@ import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
-import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -37,6 +38,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog._
@@ -611,6 +613,9 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+ val resourceUris = f.resources.map { case (resourceType, resourcePath) =>
+ new ResourceUri(ResourceType.valueOf(resourceType.toUpperCase), resourcePath)
+ }
new HiveFunction(
f.identifier.funcName,
db,
@@ -619,12 +624,21 @@ private[hive] class HiveClientImpl(
PrincipalType.USER,
(System.currentTimeMillis / 1000).toInt,
FunctionType.JAVA,
- List.empty[ResourceUri].asJava)
+ resourceUris.asJava)
}
private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
- new CatalogFunction(name, hf.getClassName)
+ val resources = hf.getResourceUris.asScala.map { uri =>
+ val resourceType = uri.getResourceType() match {
+ case ResourceType.ARCHIVE => "archive"
+ case ResourceType.FILE => "file"
+ case ResourceType.JAR => "jar"
+ case r => throw new AnalysisException(s"Unknown resource type: $r")
+ }
+ (resourceType, uri.getUri())
+ }
+ new CatalogFunction(name, hf.getClassName, resources)
}
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
index 55e69f99a4..c6c0b2ca59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala
@@ -21,13 +21,13 @@ import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.parse.EximUtil
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -278,19 +278,6 @@ class HiveSqlAstBuilder extends SparkSqlAstBuilder {
}
/**
- * Create a [[Generator]]. Override this method in order to support custom Generators.
- */
- override protected def withGenerator(
- name: String,
- expressions: Seq[Expression],
- ctx: LateralViewContext): Generator = {
- val info = Option(FunctionRegistry.getFunctionInfo(name.toLowerCase)).getOrElse {
- throw new ParseException(s"Couldn't find Generator function '$name'", ctx)
- }
- HiveGenericUDTF(name, new HiveFunctionWrapper(info.getFunctionClass.getName), expressions)
- }
-
- /**
* Create a [[HiveScriptIOSchema]].
*/
override protected def withScriptIOSchema(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 5ada3d5598..784b018353 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
import org.apache.hadoop.hive.ql.exec._
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
@@ -31,130 +30,14 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, O
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{analysis, InternalRow}
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.types._
-private[hive] class HiveFunctionRegistry(
- underlying: analysis.FunctionRegistry,
- executionHive: HiveClientImpl)
- extends analysis.FunctionRegistry with HiveInspectors {
-
- def getFunctionInfo(name: String): FunctionInfo = {
- // Hive Registry need current database to lookup function
- // TODO: the current database of executionHive should be consistent with metadataHive
- executionHive.withHiveState {
- FunctionRegistry.getFunctionInfo(name)
- }
- }
-
- override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- Try(underlying.lookupFunction(name, children)).getOrElse {
- // We only look it up to see if it exists, but do not include it in the HiveUDF since it is
- // not always serializable.
- val functionInfo: FunctionInfo =
- Option(getFunctionInfo(name.toLowerCase)).getOrElse(
- throw new AnalysisException(s"undefined function $name"))
-
- val functionClassName = functionInfo.getFunctionClass.getName
-
- // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions
- // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
- // catch the exception and throw AnalysisException instead.
- try {
- if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(
- name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udf = HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children)
- udf.dataType // Force it to check input data types.
- udf
- } else if (
- classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udaf = HiveUDAFFunction(
- name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
- udaf.dataType // Force it to check input data types.
- udaf
- } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children)
- udtf.elementTypes // Force it to check input data types.
- udtf
- } else {
- throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}")
- }
- } catch {
- case analysisException: AnalysisException =>
- // If the exception is an AnalysisException, just throw it.
- throw analysisException
- case throwable: Throwable =>
- // If there is any other error, we throw an AnalysisException.
- val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
- s"because: ${throwable.getMessage}."
- val analysisException = new AnalysisException(errorMessage)
- analysisException.setStackTrace(throwable.getStackTrace)
- throw analysisException
- }
- }
- }
-
- override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
- : Unit = underlying.registerFunction(name, info, builder)
-
- /* List all of the registered function names. */
- override def listFunction(): Seq[String] = {
- (FunctionRegistry.getFunctionNames.asScala ++ underlying.listFunction()).toList.sorted
- }
-
- /* Get the class of the registered function by specified name. */
- override def lookupFunction(name: String): Option[ExpressionInfo] = {
- underlying.lookupFunction(name).orElse(
- Try {
- val info = getFunctionInfo(name)
- val annotation = info.getFunctionClass.getAnnotation(classOf[Description])
- if (annotation != null) {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- annotation.name(),
- annotation.value(),
- annotation.extended()))
- } else {
- Some(new ExpressionInfo(
- info.getFunctionClass.getCanonicalName,
- name,
- null,
- null))
- }
- }.getOrElse(None))
- }
-
- override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = {
- underlying.lookupFunctionBuilder(name)
- }
-
- // Note: This does not drop functions stored in the metastore
- override def dropFunction(name: String): Boolean = {
- underlying.dropFunction(name)
- }
-
-}
-
private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9393302355..7f6ca21782 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -201,8 +201,13 @@ class TestHiveContext private[hive](
}
override lazy val functionRegistry = {
- new TestHiveFunctionRegistry(
- org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+ // We use TestHiveFunctionRegistry at here to track functions that have been explicitly
+ // unregistered (through TestHiveFunctionRegistry.unregisterFunction method).
+ val fr = new TestHiveFunctionRegistry
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach {
+ case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ }
+ fr
}
}
@@ -528,19 +533,18 @@ class TestHiveContext private[hive](
}
-private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)
- extends HiveFunctionRegistry(fr, client) {
+private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
private val removedFunctions =
collection.mutable.ArrayBuffer.empty[(String, (ExpressionInfo, FunctionBuilder))]
def unregisterFunction(name: String): Unit = {
- fr.functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
+ functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
}
def restore(): Unit = {
removedFunctions.foreach {
- case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ case (name, (info, builder)) => registerFunction(name, info, builder)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 53dec6348f..dd2129375d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -32,6 +32,8 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
@@ -55,6 +57,57 @@ class HiveSparkSubmitSuite
System.setProperty("spark.testing", "true")
}
+ test("temporary Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", TemporaryHiveUDFTest.getClass.getName.stripSuffix("$"),
+ "--name", "TemporaryHiveUDFTest",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: define a UDF and use it") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest1.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest1",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
+ test("permanent Hive UDF: use a already defined permanent function") {
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ val args = Seq(
+ "--class", PermanentHiveUDFTest2.getClass.getName.stripSuffix("$"),
+ "--name", "PermanentHiveUDFTest2",
+ "--master", "local-cluster[2,1,1024]",
+ "--conf", "spark.ui.enabled=false",
+ "--conf", "spark.master.rest.enabled=false",
+ "--driver-java-options", "-Dderby.system.durability=test",
+ "--jars", jarsString,
+ unusedJar.toString, "SparkSubmitClassA", "SparkSubmitClassB")
+ runSparkSubmit(args)
+ }
+
test("SPARK-8368: includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
@@ -208,6 +261,118 @@ class HiveSparkSubmitSuite
}
}
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object TemporaryHiveUDFTest extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a temporary Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE TEMPORARY FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP temporary FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test defining a new Hive UDF (with an associated jar)
+// and use this UDF. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest1 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+
+ // Load a Hive UDF from the jar.
+ logInfo("Registering a permanent Hive UDF provided in a jar.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ hiveContext.sql(
+ s"""
+ |CREATE FUNCTION example_max
+ |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax'
+ |USING JAR '$jar'
+ """.stripMargin)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
+// This application is used to test that a pre-defined permanent function with a jar
+// resources can be used. We need to run this test in separate JVM to make sure we
+// can load the jar defined with the function.
+object PermanentHiveUDFTest2 extends Logging {
+ def main(args: Array[String]) {
+ Utils.configTestLog4j("INFO")
+ val conf = new SparkConf()
+ conf.set("spark.ui.enabled", "false")
+ val sc = new SparkContext(conf)
+ val hiveContext = new TestHiveContext(sc)
+ // Load a Hive UDF from the jar.
+ logInfo("Write the metadata of a permanent Hive UDF into metastore.")
+ val jar = hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath
+ val function = CatalogFunction(
+ FunctionIdentifier("example_max"),
+ "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax",
+ ("JAR" -> jar) :: Nil)
+ hiveContext.sessionState.catalog.createFunction(function)
+ val source =
+ hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val")
+ source.registerTempTable("sourceTable")
+ // Actually use the loaded UDF.
+ logInfo("Using the UDF.")
+ val result = hiveContext.sql(
+ "SELECT example_max(key) as key, val FROM sourceTable GROUP BY val")
+ logInfo("Running a simple query on the table.")
+ val count = result.orderBy("key", "val").count()
+ if (count != 10) {
+ throw new Exception(s"Result table should have 10 rows instead of $count rows")
+ }
+ hiveContext.sql("DROP FUNCTION example_max")
+ logInfo("Test finishes.")
+ sc.stop()
+ }
+}
+
// This object is used for testing SPARK-8368: https://issues.apache.org/jira/browse/SPARK-8368.
// We test if we can load user jars in both driver and executors when HiveContext is used.
object SparkSubmitClassLoaderTest extends Logging {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index 3ab4576811..d1aa5aa931 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -17,12 +17,51 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
case class FunctionResult(f1: String, f2: String)
-class UDFSuite extends QueryTest with TestHiveSingleton {
+/**
+ * A test suite for UDF related functionalities. Because Hive metastore is
+ * case insensitive, database names and function names have both upper case
+ * letters and lower case letters.
+ */
+class UDFSuite
+ extends QueryTest
+ with SQLTestUtils
+ with TestHiveSingleton
+ with BeforeAndAfterEach {
+
+ import hiveContext.implicits._
+
+ private[this] val functionName = "myUPper"
+ private[this] val functionNameUpper = "MYUPPER"
+ private[this] val functionNameLower = "myupper"
+
+ private[this] val functionClass =
+ classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName
+
+ private var testDF: DataFrame = null
+ private[this] val testTableName = "testDF_UDFSuite"
+ private var expectedDF: DataFrame = null
+
+ override def beforeAll(): Unit = {
+ sql("USE default")
+
+ testDF = (1 to 10).map(i => s"sTr$i").toDF("value")
+ testDF.registerTempTable(testTableName)
+ expectedDF = (1 to 10).map(i => s"STR$i").toDF("value")
+ super.beforeAll()
+ }
+
+ override def afterEach(): Unit = {
+ sql("USE default")
+ super.afterEach()
+ }
test("UDF case insensitive") {
hiveContext.udf.register("random0", () => { Math.random() })
@@ -32,4 +71,128 @@ class UDFSuite extends QueryTest with TestHiveSingleton {
assert(hiveContext.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
assert(hiveContext.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
}
+
+ test("temporary function: create and drop") {
+ withUserDefinedFunction(functionName -> true) {
+ intercept[AnalysisException] {
+ sql(s"CREATE TEMPORARY FUNCTION default.$functionName AS '$functionClass'")
+ }
+ sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$functionClass'")
+ checkAnswer(
+ sql(s"SELECT $functionNameLower(value) from $testTableName"),
+ expectedDF
+ )
+ intercept[AnalysisException] {
+ sql(s"DROP TEMPORARY FUNCTION default.$functionName")
+ }
+ }
+ }
+
+ test("permanent function: create and drop without specifying db name") {
+ withUserDefinedFunction(functionName -> false) {
+ sql(s"CREATE FUNCTION $functionName AS '$functionClass'")
+ checkAnswer(
+ sql("SHOW functions like '.*upper'"),
+ Row(s"default.$functionNameLower")
+ )
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+ assert(
+ sql("SHOW functions").collect()
+ .map(_.getString(0))
+ .contains(s"default.$functionNameLower"))
+ }
+ }
+
+ test("permanent function: create and drop with a db name") {
+ // For this block, drop function command uses functionName as the function name.
+ withUserDefinedFunction(functionNameUpper -> false) {
+ sql(s"CREATE FUNCTION default.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // in SessionCatalog.lookupFunction.
+ // checkAnswer(
+ // sql(s"SELECT default.myuPPer(value) from $testTableName"),
+ // expectedDF
+ // )
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+ checkAnswer(
+ sql(s"SELECT default.$functionName(value) from $testTableName"),
+ expectedDF
+ )
+ }
+
+ // For this block, drop function command uses default.functionName as the function name.
+ withUserDefinedFunction(s"DEfault.$functionNameLower" -> false) {
+ sql(s"CREATE FUNCTION dEFault.$functionName AS '$functionClass'")
+ checkAnswer(
+ sql(s"SELECT $functionNameUpper(value) from $testTableName"),
+ expectedDF
+ )
+ }
+ }
+
+ test("permanent function: create and drop a function in another db") {
+ // For this block, drop function command uses functionName as the function name.
+ withTempDatabase { dbName =>
+ withUserDefinedFunction(functionName -> false) {
+ sql(s"CREATE FUNCTION $dbName.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // checkAnswer(
+ // sql(s"SELECT $dbName.myuPPer(value) from $testTableName"),
+ // expectedDF
+ // )
+
+ checkAnswer(
+ sql(s"SHOW FUNCTIONS like $dbName.$functionNameUpper"),
+ Row(s"$dbName.$functionNameLower")
+ )
+
+ sql(s"USE $dbName")
+
+ checkAnswer(
+ sql(s"SELECT $functionName(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE default")
+
+ checkAnswer(
+ sql(s"SELECT $dbName.$functionName(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE $dbName")
+ }
+
+ sql(s"USE default")
+
+ // For this block, drop function command uses default.functionName as the function name.
+ withUserDefinedFunction(s"$dbName.$functionNameUpper" -> false) {
+ sql(s"CREATE FUNCTION $dbName.$functionName AS '$functionClass'")
+ // TODO: Re-enable it after can distinguish qualified and unqualified function name
+ // checkAnswer(
+ // sql(s"SELECT $dbName.myupper(value) from $testTableName"),
+ // expectedDF
+ // )
+
+ sql(s"USE $dbName")
+
+ assert(
+ sql("SHOW functions").collect()
+ .map(_.getString(0))
+ .contains(s"$dbName.$functionNameLower"))
+ checkAnswer(
+ sql(s"SELECT $functionNameLower(value) from $testTableName"),
+ expectedDF
+ )
+
+ sql(s"USE default")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index b951948fda..0c57ede9ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -62,7 +62,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TestHive.cacheTables = false
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
- sql("DROP TEMPORARY FUNCTION udtf_count2")
+ sql("DROP TEMPORARY FUNCTION IF EXISTS udtf_count2")
} finally {
super.afterAll()
}
@@ -1230,14 +1230,16 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
- assert(e.getMessage.contains("undefined function not_a_udf"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("not_a_udf"))
var success = false
val t = new Thread("test") {
override def run(): Unit = {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
- assert(e.getMessage.contains("undefined function not_a_udf"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("not_a_udf"))
success = true
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index b0e263dff9..d07ac56586 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -303,7 +303,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}
@@ -313,7 +313,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}
@@ -323,7 +323,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}
@@ -333,7 +333,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}
@@ -343,7 +343,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
- assert(message.contains("No handler for Hive udf"))
+ assert(message.contains("No handler for Hive UDF"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6199253d34..14a1d4cd30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
@@ -67,22 +68,43 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext.implicits._
test("UDTF") {
- sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
- // The function source code can be found at:
- // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
- sql(
- """
- |CREATE TEMPORARY FUNCTION udtf_count2
- |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
- """.stripMargin)
+ withUserDefinedFunction("udtf_count2" -> true) {
+ sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")
+ // The function source code can be found at:
+ // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
+ sql(
+ """
+ |CREATE TEMPORARY FUNCTION udtf_count2
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ """.stripMargin)
- checkAnswer(
- sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
- Row(97, 500) :: Row(97, 500) :: Nil)
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
- checkAnswer(
- sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
- Row(3) :: Row(3) :: Nil)
+ checkAnswer(
+ sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
+ }
+
+ test("permanent UDTF") {
+ withUserDefinedFunction("udtf_count_temp" -> false) {
+ sql(
+ s"""
+ |CREATE FUNCTION udtf_count_temp
+ |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+ |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+ """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"),
+ Row(97, 500) :: Row(97, 500) :: Nil)
+
+ checkAnswer(
+ sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
+ Row(3) :: Row(3) :: Nil)
+ }
}
test("SPARK-6835: udtf in lateral view") {
@@ -169,9 +191,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("show functions") {
- val allBuiltinFunctions =
- (FunctionRegistry.builtin.listFunction().toSet[String] ++
- org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames.asScala).toList.sorted
+ val allBuiltinFunctions = FunctionRegistry.builtin.listFunction().toSet[String].toList.sorted
// The TestContext is shared by all the test cases, some functions may be registered before
// this, so we check that all the builtin functions are returned.
val allFunctions = sql("SHOW functions").collect().map(r => r(0))
@@ -183,11 +203,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(sql("SHOW functions abc.abs"), Row("abs"))
checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs"))
- checkAnswer(sql("SHOW functions `~`"), Row("~"))
+ // TODO: Re-enable this test after we fix SPARK-14335.
+ // checkAnswer(sql("SHOW functions `~`"), Row("~"))
checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil)
- checkAnswer(sql("SHOW functions `weekofyea.*`"), Row("weekofyear"))
+ checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear"))
// this probably will failed if we add more function with `sha` prefixing.
- checkAnswer(sql("SHOW functions `sha.*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
+ checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil)
+ // Test '|' for alternation.
+ checkAnswer(
+ sql("SHOW functions 'sha*|weekofyea*'"),
+ Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil)
}
test("describe functions") {
@@ -211,10 +236,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkExistence(sql("describe functioN abcadf"), true,
"Function: abcadf not found.")
- checkExistence(sql("describe functioN `~`"), true,
- "Function: ~",
- "Class: org.apache.hadoop.hive.ql.udf.UDFOPBitNot",
- "Usage: ~ n - Bitwise not")
+ // TODO: Re-enable this test after we fix SPARK-14335.
+ // checkExistence(sql("describe functioN `~`"), true,
+ // "Function: ~",
+ // "Class: org.apache.hadoop.hive.ql.udf.UDFOPBitNot",
+ // "Usage: ~ n - Bitwise not")
}
test("SPARK-5371: union with null and sum") {