aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala114
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala22
10 files changed, 194 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d4290fee0a..587ba1ea05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
@@ -208,6 +208,22 @@ class SQLContext private[sql](
sparkContext.addJar(path)
}
+ /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
+ @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = {
+ new FunctionResourceLoader {
+ override def loadResource(resource: FunctionResource): Unit = {
+ resource.resourceType match {
+ case JarResource => addJar(resource.uri)
+ case FileResource => sparkContext.addFile(resource.uri)
+ case ArchiveResource =>
+ throw new AnalysisException(
+ "Archive is not allowed to be loaded. If YARN mode is used, " +
+ "please use --archives options while calling spark-submit.")
+ }
+ }
+ }
+ }
+
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fb106d1aef..382cc61fac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -337,10 +337,9 @@ class SparkSqlAstBuilder extends AstBuilder {
CreateFunction(
database,
function,
- string(ctx.className), // TODO this is not an alias.
+ string(ctx.className),
resources,
- ctx.TEMPORARY != null)(
- command(ctx))
+ ctx.TEMPORARY != null)
}
/**
@@ -353,7 +352,7 @@ class SparkSqlAstBuilder extends AstBuilder {
*/
override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
val (database, function) = visitFunctionName(ctx.qualifiedName)
- DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)(command(ctx))
+ DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index a4be3bc333..faa7a2cdb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -426,8 +426,12 @@ case class ShowTablePropertiesCommand(
* A command for users to list all of the registered functions.
* The syntax of using this command in SQL is:
* {{{
- * SHOW FUNCTIONS
+ * SHOW FUNCTIONS [LIKE pattern]
* }}}
+ * For the pattern, '*' matches any sequence of characters (including no characters) and
+ * '|' is for alternation.
+ * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
+ *
* TODO currently we are simply ignore the db
*/
case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
@@ -438,18 +442,17 @@ case class ShowFunctions(db: Option[String], pattern: Option[String]) extends Ru
schema.toAttributes
}
- override def run(sqlContext: SQLContext): Seq[Row] = pattern match {
- case Some(p) =>
- try {
- val regex = java.util.regex.Pattern.compile(p)
- sqlContext.sessionState.functionRegistry.listFunction()
- .filter(regex.matcher(_).matches()).map(Row(_))
- } catch {
- // probably will failed in the regex that user provided, then returns empty row.
- case _: Throwable => Seq.empty[Row]
- }
- case None =>
- sqlContext.sessionState.functionRegistry.listFunction().map(Row(_))
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ // If pattern is not specified, we use '*', which is used to
+ // match any sequence of characters (including no characters).
+ val functionNames =
+ sqlContext.sessionState.catalog
+ .listFunctions(dbName, pattern.getOrElse("*"))
+ .map(_.unquotedString)
+ // The session catalog caches some persistent functions in the FunctionRegistry
+ // so there can be duplicates.
+ functionNames.distinct.sorted.map(Row(_))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index cd7e0eed65..6896881910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -175,26 +175,6 @@ case class DescribeDatabase(
}
}
-case class CreateFunction(
- databaseName: Option[String],
- functionName: String,
- alias: String,
- resources: Seq[(String, String)],
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
-/**
- * The DDL command that drops a function.
- * ifExists: returns an error if the function doesn't exist, unless this is true.
- * isTemp: indicates if it is a temporary function.
- */
-case class DropFunction(
- databaseName: Option[String],
- functionName: String,
- ifExists: Boolean,
- isTemp: Boolean)(sql: String)
- extends NativeDDLCommand(sql) with Logging
-
/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
case class AlterTableRename(
oldName: TableIdentifier,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
new file mode 100644
index 0000000000..66d17e322e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+
+
+/**
+ * The DDL command that creates a function.
+ * To create a temporary function, the syntax of using this command in SQL is:
+ * {{{
+ * CREATE TEMPORARY FUNCTION functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ *
+ * To create a permanent function, the syntax in SQL is:
+ * {{{
+ * CREATE FUNCTION [databaseName.]functionName
+ * AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ */
+// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for resources.
+case class CreateFunction(
+ databaseName: Option[String],
+ functionName: String,
+ className: String,
+ resources: Seq[(String, String)],
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when defining a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ // We first load resources and then put the builder in the function registry.
+ // Please note that it is allowed to overwrite an existing temp function.
+ sqlContext.sessionState.catalog.loadFunctionResources(resources)
+ val info = new ExpressionInfo(className, functionName)
+ val builder =
+ sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className)
+ sqlContext.sessionState.catalog.createTempFunction(
+ functionName, info, builder, ignoreIfExists = false)
+ } else {
+ // For a permanent, we will store the metadata into underlying external catalog.
+ // This function will be loaded into the FunctionRegistry when a query uses it.
+ // We do not load it into FunctionRegistry right now.
+ val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ val catalogFunc = CatalogFunction(func, className, resources)
+ if (sqlContext.sessionState.catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' already exists in database '$dbName'.")
+ }
+ sqlContext.sessionState.catalog.createFunction(catalogFunc)
+ }
+ Seq.empty[Row]
+ }
+}
+
+/**
+ * The DDL command that drops a function.
+ * ifExists: returns an error if the function doesn't exist, unless this is true.
+ * isTemp: indicates if it is a temporary function.
+ */
+case class DropFunction(
+ databaseName: Option[String],
+ functionName: String,
+ ifExists: Boolean,
+ isTemp: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val catalog = sqlContext.sessionState.catalog
+ if (isTemp) {
+ if (databaseName.isDefined) {
+ throw new AnalysisException(
+ s"It is not allowed to provide database name when dropping a temporary function. " +
+ s"However, database name ${databaseName.get} is provided.")
+ }
+ catalog.dropTempFunction(functionName, ifExists)
+ } else {
+ // We are dropping a permanent function.
+ val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
+ val func = FunctionIdentifier(functionName, Some(dbName))
+ if (!ifExists && !catalog.functionExists(func)) {
+ throw new AnalysisException(
+ s"Function '$functionName' does not exist in database '$dbName'.")
+ }
+ catalog.dropFunction(func)
+ }
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cd29def3be..69e3358d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -51,7 +51,12 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Internal catalog for managing table and database states.
*/
- lazy val catalog = new SessionCatalog(ctx.externalCatalog, functionRegistry, conf)
+ lazy val catalog =
+ new SessionCatalog(
+ ctx.externalCatalog,
+ ctx.functionResourceLoader,
+ functionRegistry,
+ conf)
/**
* Interface exposed to the user for registering user-defined functions.
@@ -62,7 +67,7 @@ private[sql] class SessionState(ctx: SQLContext) {
* Logical query plan analyzer for resolving unresolved attributes and relations.
*/
lazy val analyzer: Analyzer = {
- new Analyzer(catalog, functionRegistry, conf) {
+ new Analyzer(catalog, conf) {
override val extendedResolutionRules =
PreInsertCastAndRename ::
DataSourceAnalysis ::
@@ -98,5 +103,5 @@ private[sql] class SessionState(ctx: SQLContext) {
* Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
-
}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b727e88668..5a851b47ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -61,8 +61,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.filter(regex.matcher(_).matches()).map(Row(_))
}
checkAnswer(sql("SHOW functions"), getFunctions(".*"))
- Seq("^c.*", ".*e$", "log.*", ".*date.*").foreach { pattern =>
- checkAnswer(sql(s"SHOW FUNCTIONS '$pattern'"), getFunctions(pattern))
+ Seq("^c*", "*e$", "log*", "*date*").foreach { pattern =>
+ // For the pattern part, only '*' and '|' are allowed as wildcards.
+ // For '*', we need to replace it to '.*'.
+ checkAnswer(
+ sql(s"SHOW FUNCTIONS '$pattern'"),
+ getFunctions(pattern.replaceAll("\\*", ".*")))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index fd736718af..ec950332c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -83,7 +83,8 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val e = intercept[AnalysisException] {
df.selectExpr("a_function_that_does_not_exist()")
}
- assert(e.getMessage.contains("undefined function"))
+ assert(e.getMessage.contains("Undefined function"))
+ assert(e.getMessage.contains("a_function_that_does_not_exist"))
}
test("Simple UDF") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 47e295a7e7..c42e8e7233 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -147,13 +147,13 @@ class DDLCommandSuite extends PlanTest {
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = CreateFunction(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
- isTemp = false)(sql2)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}
@@ -173,22 +173,22 @@ class DDLCommandSuite extends PlanTest {
None,
"helloworld",
ifExists = false,
- isTemp = true)(sql1)
+ isTemp = true)
val expected2 = DropFunction(
None,
"helloworld",
ifExists = true,
- isTemp = true)(sql2)
+ isTemp = true)
val expected3 = DropFunction(
Some("hello"),
"world",
ifExists = false,
- isTemp = false)(sql3)
+ isTemp = false)
val expected4 = DropFunction(
Some("hello"),
"world",
ifExists = true,
- isTemp = false)(sql4)
+ isTemp = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 80a85a6615..7844d1b296 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.Filter
@@ -132,6 +133,27 @@ private[sql] trait SQLTestUtils
}
/**
+ * Drops functions after calling `f`. A function is represented by (functionName, isTemporary).
+ */
+ protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = {
+ try {
+ f
+ } catch {
+ case cause: Throwable => throw cause
+ } finally {
+ // If the test failed part way, we don't want to mask the failure by failing to remove
+ // temp tables that never got created.
+ try functions.foreach { case (functionName, isTemporary) =>
+ val withTemporary = if (isTemporary) "TEMPORARY" else ""
+ sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName")
+ assert(
+ !sqlContext.sessionState.catalog.functionExists(FunctionIdentifier(functionName)),
+ s"Function $functionName should have been dropped. But, it still exists.")
+ }
+ }
+ }
+
+ /**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {