diff options
author | Andrew Or <andrew@databricks.com> | 2016-04-07 16:23:17 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-07 16:23:17 -0700 |
commit | ae1db91d158d1ae62a0ab7ea74467679ca050101 (patch) | |
tree | eea4630e6ac9b3b2a2a9b17b2ef510f39ab0e954 /sql/hive/src | |
parent | aa852215f82876977d164f371627e894e86baacc (diff) | |
download | spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.gz spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.tar.bz2 spark-ae1db91d158d1ae62a0ab7ea74467679ca050101.zip |
[SPARK-14410][SQL] Push functions existence check into catalog
## What changes were proposed in this pull request?
This is a followup to #12117 and addresses some of the TODOs introduced there. In particular, the resolution of database is now pushed into session catalog, which knows about the current database. Further, the logic for checking whether a function exists is pushed into the external catalog.
No change in functionality is expected.
## How was this patch tested?
`SessionCatalogSuite`, `DDLSuite`
Author: Andrew Or <andrew@databricks.com>
Closes #12198 from andrewor14/function-exists.
Diffstat (limited to 'sql/hive/src')
5 files changed, 22 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 98a5998d03..b1156fb3e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -292,6 +292,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.getFunction(db, funcName) } + override def functionExists(db: String, funcName: String): Boolean = withClient { + client.functionExists(db, funcName) + } + override def listFunctions(db: String, pattern: String): Seq[String] = withClient { client.listFunctions(db, pattern) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ee56f9d75d..94794b1572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -232,6 +232,11 @@ private[hive] trait HiveClient { /** Return an existing function in the database, or None if it doesn't exist. */ def getFunctionOption(db: String, name: String): Option[CatalogFunction] + /** Return whether a function exists in the specified database. */ + final def functionExists(db: String, name: String): Boolean = { + getFunctionOption(db, name).isDefined + } + /** Return the names of all functions that match the given pattern in the database. */ def listFunctions(db: String, pattern: String): Seq[String] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1f66fbfd85..d0eb9ddf50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -21,7 +21,6 @@ import java.io.{File, PrintStream} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -30,7 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException} import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState @@ -559,7 +559,11 @@ private[hive] class HiveClientImpl( override def getFunctionOption( db: String, name: String): Option[CatalogFunction] = withHiveState { - Option(client.getFunction(db, name)).map(fromHiveFunction) + try { + Option(client.getFunction(db, name)).map(fromHiveFunction) + } catch { + case he: HiveException => None + } } override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 6967395613..ada8621d07 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,8 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = + sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +145,8 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = + sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index dd2129375d..c5417b06a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -354,7 +354,7 @@ object PermanentHiveUDFTest2 extends Logging { FunctionIdentifier("example_max"), "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax", ("JAR" -> jar) :: Nil) - hiveContext.sessionState.catalog.createFunction(function) + hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false) val source = hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val") source.registerTempTable("sourceTable") |