aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-04-17 09:50:20 -0700
committerXiao Li <gatorsmile@gmail.com>2017-04-17 09:50:20 -0700
commit01ff0350a85b179715946c3bd4f003db7c5e3641 (patch)
tree8c4a43ff6d217c1b4587c7b9d06afa0e7fa9b44a
parent24f09b39c7b947e52fda952676d5114c2540e732 (diff)
downloadspark-01ff0350a85b179715946c3bd4f003db7c5e3641.tar.gz
spark-01ff0350a85b179715946c3bd4f003db7c5e3641.tar.bz2
spark-01ff0350a85b179715946c3bd4f003db7c5e3641.zip
[SPARK-20349][SQL] ListFunctions returns duplicate functions after using persistent functions
### What changes were proposed in this pull request? The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it. It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names. ### How was this patch tested? Added test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17646 from gatorsmile/showFunctions.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala17
3 files changed, 34 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 1417bccf65..3fbf83f3a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,6 +22,7 @@ import java.util.Locale
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
@@ -1202,15 +1203,25 @@ class SessionCatalog(
def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
val dbName = formatDatabaseName(db)
requireDbExists(dbName)
- val dbFunctions = externalCatalog.listFunctions(dbName, pattern)
- .map { f => FunctionIdentifier(f, Some(dbName)) }
- val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
- .map { f => FunctionIdentifier(f) }
+ val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f =>
+ FunctionIdentifier(f, Some(dbName)) }
+ val loadedFunctions =
+ StringUtils.filterPattern(functionRegistry.listFunction(), pattern).map { f =>
+ // In functionRegistry, function names are stored as an unquoted format.
+ Try(parser.parseFunctionIdentifier(f)) match {
+ case Success(e) => e
+ case Failure(_) =>
+ // The names of some built-in functions are not parsable by our parser, e.g., %
+ FunctionIdentifier(f)
+ }
+ }
val functions = dbFunctions ++ loadedFunctions
+ // The session catalog caches some persistent functions in the FunctionRegistry
+ // so there can be duplicates.
functions.map {
case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM")
case f => (f, "USER")
- }
+ }.distinct
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index e0d0029369..545082324f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -207,8 +207,6 @@ case class ShowFunctionsCommand(
case (f, "USER") if showUserFunctions => f.unquotedString
case (f, "SYSTEM") if showSystemFunctions => f.unquotedString
}
- // The session catalog caches some persistent functions in the FunctionRegistry
- // so there can be duplicates.
- functionNames.distinct.sorted.map(Row(_))
+ functionNames.sorted.map(Row(_))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 4bbf925919..4446af2e75 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -573,6 +573,23 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
checkAnswer(testData.selectExpr("statelessUDF() as s").agg(max($"s")), Row(1))
}
}
+
+ test("Show persistent functions") {
+ val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF()
+ withTempView("inputTable") {
+ testData.createOrReplaceTempView("inputTable")
+ withUserDefinedFunction("testUDFToListInt" -> false) {
+ val numFunc = spark.catalog.listFunctions().count()
+ sql(s"CREATE FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'")
+ assert(spark.catalog.listFunctions().count() == numFunc + 1)
+ checkAnswer(
+ sql("SELECT testUDFToListInt(s) FROM inputTable"),
+ Seq(Row(Seq(1, 2, 3))))
+ assert(sql("show functions").count() == numFunc + 1)
+ assert(spark.catalog.listFunctions().count() == numFunc + 1)
+ }
+ }
+ }
}
class TestPair(x: Int, y: Int) extends Writable with Serializable {