aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-05 12:27:06 -0700
committerAndrew Or <andrew@databricks.com>2016-04-05 12:27:06 -0700
commit72544d6f2a72b9e44e0a32de1fb379e3342be5c3 (patch)
tree8767dc0a6e575dac7cbd87ccbb5c01f504d6b851 /sql/core
parentbc36df127d3b9f56b4edaeb5eca7697d4aef761a (diff)
downloadspark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.gz
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.tar.bz2
spark-72544d6f2a72b9e44e0a32de1fb379e3342be5c3.zip
[SPARK-14123][SPARK-14384][SQL] Handle CreateFunction/DropFunction
## What changes were proposed in this pull request? This PR implements CreateFunction and DropFunction commands. Besides implementing these two commands, we also change how to manage functions. Here are the main changes. * `FunctionRegistry` will be a container to store all functions builders and it will not actively load any functions. Because of this change, we do not need to maintain a separate registry for HiveContext. So, `HiveFunctionRegistry` is deleted. * SessionCatalog takes care the job of loading a function if this function is not in the `FunctionRegistry` but its metadata is stored in the external catalog. For this case, SessionCatalog will (1) load the metadata from the external catalog, (2) load all needed resources (i.e. jars and files), (3) create a function builder based on the function definition, (4) register the function builder in the `FunctionRegistry`. * A `UnresolvedGenerator` is created. So, the parser will not need to call `FunctionRegistry` directly during parsing, which is not a good time to create a Hive UDTF. In the analysis phase, we will resolve `UnresolvedGenerator`. This PR is based on viirya's https://github.com/apache/spark/pull/12036/ ## How was this patch tested? Existing tests and new tests. ## TODOs [x] Self-review [x] Cleanup [x] More tests for create/drop functions (we need to more tests for permanent functions). [ ] File JIRAs for all TODOs [x] Standardize the error message when a function does not exist. Author: Yin Huai <yhuai@databricks.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #12117 from yhuai/function.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala114
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala22
10 files changed, 194 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d4290fee0a..587ba1ea05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
@@ -208,6 +208,22 @@ class SQLContext private[sql](
sparkContext.addJar(path)
}
+ /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
+ @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = {
+ new FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ resource.resourceType match {
+ case JarResource => addJar(resource.uri)
+ case FileResource => sparkContext.addFile(resource.uri)
+ case ArchiveResource =>
+ throw new AnalysisException(
+ "Archive is not allowed to be loaded. If YARN mode is used, " +
+ "please use --archives options while calling spark-submit.")
+ }
+ }
+ }
+ }
+
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fb106d1aef..382cc61fac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -337,10 +337,9 @@ class SparkSqlAstBuilder extends AstBuilder {
CreateFunction(
database,
function,
- string(ctx.className), // TODO this is not an alias.
+ string(ctx.className),
resources,
- ctx.TEMPORARY != null)(
- command(ctx))
+ ctx.TEMPORARY != null)
}
/**
@@ -353,7 +352,7 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
val (database, function) = visitFunctionName(ctx.qualifiedName)
- DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
+ DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index a4be3bc333..faa7a2cdb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -426,8 +426,12 @@ case class ShowTablePropertiesCommand(
* A command for users to list all of the registered functions.
* The syntax of using this command in SQL is:
* {{{
- * SHOW FUNCTIONS
+ * SHOW FUNCTIONS [LIKE pattern]
* }}}
+ * For the pattern, '*' matches any sequence of characters (including no characters) and
+ * '|' is for alternation.
+ * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
+ *
* TODO currently we are simply ignore the db
*/
case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
@@ -438,18 +442,17 @@ case class ShowFunctions(db: Option[String], pattern: Option[String]) extends Ru
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = pattern match {
- case Some(p) =>
- try {
- val regex = java.util.regex.Pattern.compile(p)
- sqlContext.sessionState.functionRegistry.listFunction()
- .filter(regex.matcher(_).matches()).map(Row(_))
- } catch {
- // probably will failed in the regex that user provided, then returns empty row.
- case _: Throwable => Seq.empty[Row]
- }
- case None =>
- sqlContext.sessionState.functionRegistry.listFunction().map(Row(_))
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ // If pattern is not specified, we use '*', which is used to
+ // match any sequence of characters (including no characters).
+ val functionNames =
+ sqlContext.sessionState.catalog
+ .listFunctions(dbName, pattern.getOrElse("*"))
+ .map(_.unquotedString)
+ // The session catalog caches some persistent functions in the FunctionRegistry
+ // so there can be duplicates.
+ functionNames.distinct.sorted.map(Row(_))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index cd7e0eed65..6896881910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -175,26 +175,6 @@ case class DescribeDatabase(
}
}
-case class CreateFunction(
- databaseName: Option[String],
- functionName: String,
- alias: String,
- resources: Seq[(String, String)],
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-/**
- * The DDL command that drops a function.
- * ifExists: returns an error if the function doesn't exist, unless this is true.
- * isTemp: indicates if it is a temporary function.
- */
-case class DropFunction(
- databaseName: Option[String],
- functionName: String,
- ifExists: Boolean,
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
case class AlterTableRename(
oldName: TableIdentifier,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
new file mode 100644
index 0000000000..66d17e322e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+
+
+/**
+ * The DDL command that creates a function.
+ * To create a temporary function, the syntax of using this command in SQL is:
+ * {{{
+ * CREATE TEMPORARY FUNCTION functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ *
+ * To create a permanent function, the syntax in SQL is:
+ * {{{
+ * CREATE FUNCTION [databaseName.]functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ */
+// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources.
+case class CreateFunction(
+ databaseName: Option[String],
+ functionName: String,
+ className: String,
+ resources: Seq[(String, String)],
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when defining a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ // We first load resources and then put the builder in the function registry.
+ // Please note that it is allowed to overwrite an existing temp function.
+ sqlContext.sessionState.catalog.loadFunctionResources(resources)
+ val info = new ExpressionInfo(className, functionName)
+ val builder =
+ sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
+ sqlContext.sessionState.catalog.createTempFunction(
+ functionName, info, builder, ignoreIfExists = false)
+ } else {
+ // For a permanent, we will store the metadata into underlying external catalog.
+ // This function will be loaded into the FunctionRegistry when a query uses it.
+ // We do not load it into FunctionRegistry right now.
+ val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ val catalogFunc = CatalogFunction(func, className, resources)
+ if (sqlContext.sessionState.catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' already exists in database '$dbName'.")
+ }
+ sqlContext.sessionState.catalog.createFunction(catalogFunc)
+ }
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * The DDL command that drops a function.
+ * ifExists: returns an error if the function doesn't exist, unless this is true.
+ * isTemp: indicates if it is a temporary function.
+ */
+case class DropFunction(
+ databaseName: Option[String],
+ functionName: String,
+ ifExists: Boolean,
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when dropping a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ catalog.dropTempFunction(functionName, ifExists)
+ } else {
+ // We are dropping a permanent function.
+ val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ if (!ifExists && !catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' does not exist in database '$dbName'.")
+ }
+ catalog.dropFunction(func)
+ }
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cd29def3be..69e3358d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -51,7 +51,12 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Internal catalog for managing table and database states.
*/
- lazy val catalog = new SessionCatalog(ctx.externalCatalog, functionRegistry, conf)
+ lazy val catalog =
+ new SessionCatalog(
+ ctx.externalCatalog,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
/**
* Interface exposed to the user for registering user-defined functions.
@@ -62,7 +67,7 @@ private[sql] class SessionState(ctx: SQLContext) {
* Logical query plan analyzer for resolving unresolved attributes and relations.
*/
lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
DataSourceAnalysis ::
@@ -98,5 +103,5 @@ private[sql] class SessionState(ctx: SQLContext) {
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
-
}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b727e88668..5a851b47ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -61,8 +61,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.filter(regex.matcher(_).matches()).map(Row(_))
}
checkAnswer(sql("SHOW functions"), getFunctions(".*"))
- Seq("^c.*", ".*e$", "log.*", ".*date.*").foreach { pattern =>
- checkAnswer(sql(s"SHOW FUNCTIONS '$pattern'"), getFunctions(pattern))
+ Seq("^c*", "*e$", "log*", "*date*").foreach { pattern =>
+ // For the pattern part, only '*' and '|' are allowed as wildcards.
+ // For '*', we need to replace it to '.*'.
+ checkAnswer(
+ sql(s"SHOW FUNCTIONS '$pattern'"),
+ getFunctions(pattern.replaceAll("\\*", ".*")))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index fd736718af..ec950332c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -83,7 +83,8 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val e = intercept[AnalysisException] {
df.selectExpr("a_function_that_does_not_exist()")
}
- assert(e.getMessage.contains("undefined function"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("a_function_that_does_not_exist"))
}
test("Simple UDF") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 47e295a7e7..c42e8e7233 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -147,13 +147,13 @@ class DDLCommandSuite extends PlanTest {
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = CreateFunction(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
- isTemp = false)(sql2)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
@@ -173,22 +173,22 @@ class DDLCommandSuite extends PlanTest {
None,
"helloworld",
ifExists = false,
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = DropFunction(
None,
"helloworld",
ifExists = true,
- isTemp = true)(sql2)
+ isTemp = true)
val expected3 = DropFunction(
Some("hello"),
"world",
ifExists = false,
- isTemp = false)(sql3)
+ isTemp = false)
val expected4 = DropFunction(
Some("hello"),
"world",
ifExists = true,
- isTemp = false)(sql4)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 80a85a6615..7844d1b296 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.Filter
@@ -132,6 +133,27 @@ private[sql] trait SQLTestUtils
}
/**
+ * Drops functions after calling `f`. A function is represented by (functionName, isTemporary).
+ */
+ protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = {
+ try {
+ f
+ } catch {
+ case cause: Throwable => throw cause
+ } finally {
+ // If the test failed part way, we don't want to mask the failure by failing to remove
+ // temp tables that never got created.
+ try functions.foreach { case (functionName, isTemporary) =>
+ val withTemporary = if (isTemporary) "TEMPORARY" else ""
+ sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName")
+ assert(
+ !sqlContext.sessionState.catalog.functionExists(FunctionIdentifier(functionName)),
+ s"Function $functionName should have been dropped. But, it still exists.")
+ }
+ }
+ }
+
+ /**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {