From 01ff0350a85b179715946c3bd4f003db7c5e3641 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Mon, 17 Apr 2017 09:50:20 -0700 Subject: [SPARK-20349][SQL] ListFunctions returns duplicate functions after using persistent functions ### What changes were proposed in this pull request? The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it. It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names. ### How was this patch tested? Added test cases. Author: Xiao Li Closes #17646 from gatorsmile/showFunctions. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 21 ++++++++++++++++----- .../spark/sql/execution/command/functions.scala | 4 +--- .../spark/sql/hive/execution/HiveUDFSuite.scala | 17 +++++++++++++++++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1417bccf65..3fbf83f3a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,6 +22,7 @@ import java.util.Locale import javax.annotation.concurrent.GuardedBy import scala.collection.mutable +import scala.util.{Failure, Success, Try} import com.google.common.cache.{Cache, CacheBuilder} import org.apache.hadoop.conf.Configuration @@ -1202,15 +1203,25 @@ class SessionCatalog( def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = { val dbName = formatDatabaseName(db) requireDbExists(dbName) - val dbFunctions = externalCatalog.listFunctions(dbName, pattern) - .map { f => FunctionIdentifier(f, Some(dbName)) } - val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern) - .map { f => FunctionIdentifier(f) } + val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f => + FunctionIdentifier(f, Some(dbName)) } + val loadedFunctions = + StringUtils.filterPattern(functionRegistry.listFunction(), pattern).map { f => + // In functionRegistry, function names are stored as an unquoted format. + Try(parser.parseFunctionIdentifier(f)) match { + case Success(e) => e + case Failure(_) => + // The names of some built-in functions are not parsable by our parser, e.g., % + FunctionIdentifier(f) + } + } val functions = dbFunctions ++ loadedFunctions + // The session catalog caches some persistent functions in the FunctionRegistry + // so there can be duplicates. functions.map { case f if FunctionRegistry.functionSet.contains(f.funcName) => (f, "SYSTEM") case f => (f, "USER") - } + }.distinct } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index e0d0029369..545082324f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -207,8 +207,6 @@ case class ShowFunctionsCommand( case (f, "USER") if showUserFunctions => f.unquotedString case (f, "SYSTEM") if showSystemFunctions => f.unquotedString } - // The session catalog caches some persistent functions in the FunctionRegistry - // so there can be duplicates. - functionNames.distinct.sorted.map(Row(_)) + functionNames.sorted.map(Row(_)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 4bbf925919..4446af2e75 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -573,6 +573,23 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { checkAnswer(testData.selectExpr("statelessUDF() as s").agg(max($"s")), Row(1)) } } + + test("Show persistent functions") { + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + withTempView("inputTable") { + testData.createOrReplaceTempView("inputTable") + withUserDefinedFunction("testUDFToListInt" -> false) { + val numFunc = spark.catalog.listFunctions().count() + sql(s"CREATE FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") + assert(spark.catalog.listFunctions().count() == numFunc + 1) + checkAnswer( + sql("SELECT testUDFToListInt(s) FROM inputTable"), + Seq(Row(Seq(1, 2, 3)))) + assert(sql("show functions").count() == numFunc + 1) + assert(spark.catalog.listFunctions().count() == numFunc + 1) + } + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { -- cgit v1.2.3