diff options
13 files changed, 126 insertions, 114 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 5d136b663f..186bbccef1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -47,11 +47,6 @@ class InMemoryCatalog extends ExternalCatalog { // Database name -> description private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc] - private def functionExists(db: String, funcName: String): Boolean = { - requireDbExists(db) - catalog(db).functions.contains(funcName) - } - private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) @@ -315,6 +310,11 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).functions(funcName) } + override def functionExists(db: String, funcName: String): Boolean = { + requireDbExists(db) + catalog(db).functions.contains(funcName) + } + override def listFunctions(db: String, pattern: String): Seq[String] = synchronized { requireDbExists(db) StringUtils.filterPattern(catalog(db).functions.keysIterator.toSeq, pattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2acf584e8f..7db9fd0527 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -95,7 +95,7 @@ class SessionCatalog( externalCatalog.alterDatabase(dbDefinition) } - def getDatabase(db: String): CatalogDatabase = { + def getDatabaseMetadata(db: String): CatalogDatabase = { externalCatalog.getDatabase(db) } @@ -169,7 +169,7 @@ class SessionCatalog( * If no database is specified, assume the table is in the current database. * If the specified table is not found in the database then an [[AnalysisException]] is thrown. */ - def getTable(name: TableIdentifier): CatalogTable = { + def getTableMetadata(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) val table = formatTableName(name.table) externalCatalog.getTable(db, table) @@ -435,28 +435,37 @@ class SessionCatalog( * Create a metastore function in the database specified in `funcDefinition`. * If no such database is specified, create it in the current database. */ - def createFunction(funcDefinition: CatalogFunction): Unit = { + def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { val db = funcDefinition.identifier.database.getOrElse(currentDb) - val newFuncDefinition = funcDefinition.copy( - identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))) - externalCatalog.createFunction(db, newFuncDefinition) + val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) + val newFuncDefinition = funcDefinition.copy(identifier = identifier) + if (!functionExists(identifier)) { + externalCatalog.createFunction(db, newFuncDefinition) + } else if (!ignoreIfExists) { + throw new AnalysisException(s"function '$identifier' already exists in database '$db'") + } } /** * Drop a metastore function. * If no database is specified, assume the function is in the current database. */ - def dropFunction(name: FunctionIdentifier): Unit = { + def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - val qualified = name.copy(database = Some(db)).unquotedString - if (functionRegistry.functionExists(qualified)) { - // If we have loaded this function into the FunctionRegistry, - // also drop it from there. - // For a permanent function, because we loaded it to the FunctionRegistry - // when it's first used, we also need to drop it from the FunctionRegistry. - functionRegistry.dropFunction(qualified) + val identifier = name.copy(database = Some(db)) + if (functionExists(identifier)) { + // TODO: registry should just take in FunctionIdentifier for type safety + if (functionRegistry.functionExists(identifier.unquotedString)) { + // If we have loaded this function into the FunctionRegistry, + // also drop it from there. + // For a permanent function, because we loaded it to the FunctionRegistry + // when it's first used, we also need to drop it from the FunctionRegistry. + functionRegistry.dropFunction(identifier.unquotedString) + } + externalCatalog.dropFunction(db, name.funcName) + } else if (!ignoreIfNotExists) { + throw new AnalysisException(s"function '$identifier' does not exist in database '$db'") } - externalCatalog.dropFunction(db, name.funcName) } /** @@ -465,8 +474,7 @@ class SessionCatalog( * If a database is specified in `name`, this will return the function in that database. * If no database is specified, this will return the function in the current database. */ - // TODO: have a better name. This method is actually for fetching the metadata of a function. - def getFunction(name: FunctionIdentifier): CatalogFunction = { + def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = { val db = name.database.getOrElse(currentDb) externalCatalog.getFunction(db, name.funcName) } @@ -475,20 +483,9 @@ class SessionCatalog( * Check if the specified function exists. */ def functionExists(name: FunctionIdentifier): Boolean = { - if (functionRegistry.functionExists(name.unquotedString)) { - // This function exists in the FunctionRegistry. - true - } else { - // Need to check if this function exists in the metastore. - try { - // TODO: It's better to ask external catalog if this function exists. - // So, we can avoid of having this hacky try/catch block. - getFunction(name) != null - } catch { - case _: NoSuchFunctionException => false - case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it. - } - } + val db = name.database.getOrElse(currentDb) + functionRegistry.functionExists(name.unquotedString) || + externalCatalog.functionExists(db, name.funcName) } // ---------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 97b9946140..e29d6bd8b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -152,6 +152,8 @@ abstract class ExternalCatalog { def getFunction(db: String, funcName: String): CatalogFunction + def functionExists(db: String, funcName: String): Boolean + def listFunctions(db: String, pattern: String): Seq[String] } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4d56d001b3..1850dc8156 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -62,7 +62,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("get database when a database exists") { val catalog = new SessionCatalog(newBasicCatalog()) - val db1 = catalog.getDatabase("db1") + val db1 = catalog.getDatabaseMetadata("db1") assert(db1.name == "db1") assert(db1.description.contains("db1")) } @@ -70,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("get database should throw exception when the database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.getDatabase("db_that_does_not_exist") + catalog.getDatabaseMetadata("db_that_does_not_exist") } } @@ -128,10 +128,10 @@ class SessionCatalogSuite extends SparkFunSuite { test("alter database") { val catalog = new SessionCatalog(newBasicCatalog()) - val db1 = catalog.getDatabase("db1") + val db1 = catalog.getDatabaseMetadata("db1") // Note: alter properties here because Hive does not support altering other fields catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) - val newDb1 = catalog.getDatabase("db1") + val newDb1 = catalog.getDatabaseMetadata("db1") assert(db1.properties.isEmpty) assert(newDb1.properties.size == 2) assert(newDb1.properties.get("k") == Some("v3")) @@ -346,21 +346,21 @@ class SessionCatalogSuite extends SparkFunSuite { test("get table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2"))) + assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) == externalCatalog.getTable("db2", "tbl1")) // Get table without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTable(TableIdentifier("tbl1")) + assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1")) == externalCatalog.getTable("db2", "tbl1")) } test("get table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.getTable(TableIdentifier("tbl1", Some("unknown_db"))) + catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db"))) } intercept[AnalysisException] { - catalog.getTable(TableIdentifier("unknown_table", Some("db2"))) + catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2"))) } } @@ -386,7 +386,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("lookup table relation with alias") { val catalog = new SessionCatalog(newBasicCatalog()) val alias = "monster" - val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2"))) + val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata)) val relationWithAlias = SubqueryAlias(alias, @@ -659,26 +659,28 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newEmptyCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - sessionCatalog.createFunction(newFunc("myfunc", Some("mydb"))) + sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false) assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) // Create function without explicitly specifying database sessionCatalog.setCurrentDatabase("mydb") - sessionCatalog.createFunction(newFunc("myfunc2")) + sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false) assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) } test("create function when database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.createFunction(newFunc("func5", Some("does_not_exist"))) + catalog.createFunction( + newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) } } test("create function that already exists") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.createFunction(newFunc("func1", Some("db2"))) + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) } + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) } test("create temp function") { @@ -711,24 +713,27 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) - sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2"))) + sessionCatalog.dropFunction( + FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false) assert(externalCatalog.listFunctions("db2", "*").isEmpty) // Drop function without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.createFunction(newFunc("func2", Some("db2"))) + sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) - sessionCatalog.dropFunction(FunctionIdentifier("func2")) + sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false) assert(externalCatalog.listFunctions("db2", "*").isEmpty) } test("drop function when database/function does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist"))) + catalog.dropFunction( + FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false) } intercept[AnalysisException] { - catalog.dropFunction(FunctionIdentifier("does_not_exist")) + catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false) } + catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true) } test("drop temp function") { @@ -753,19 +758,19 @@ class SessionCatalogSuite extends SparkFunSuite { val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, Seq.empty[(String, String)]) - assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected) + assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected) // Get function without explicitly specifying database catalog.setCurrentDatabase("db2") - assert(catalog.getFunction(FunctionIdentifier("func1")) == expected) + assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected) } test("get function when database/function does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[AnalysisException] { - catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist"))) + catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist"))) } intercept[AnalysisException] { - catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2"))) + catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2"))) } } @@ -787,8 +792,8 @@ class SessionCatalogSuite extends SparkFunSuite { val info2 = new ExpressionInfo("tempFunc2", "yes_me") val tempFunc1 = (e: Seq[Expression]) => e.head val tempFunc2 = (e: Seq[Expression]) => e.last - catalog.createFunction(newFunc("func2", Some("db2"))) - catalog.createFunction(newFunc("not_me", Some("db2"))) + catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) + catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false) assert(catalog.listFunctions("db1", "*").toSet == diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index faa7a2cdb4..3fd2a93d29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -407,7 +407,7 @@ case class ShowTablePropertiesCommand( if (catalog.isTemporaryTable(table)) { Seq.empty[Row] } else { - val catalogTable = sqlContext.sessionState.catalog.getTable(table) + val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table) propertyKey match { case Some(p) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6d56a6fec8..20779d68e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -124,7 +124,7 @@ case class AlterDatabaseProperties( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val db: CatalogDatabase = catalog.getDatabase(databaseName) + val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName) catalog.alterDatabase(db.copy(properties = db.properties ++ props)) Seq.empty[Row] @@ -149,7 +149,8 @@ case class DescribeDatabase( extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val dbMetadata: CatalogDatabase = sqlContext.sessionState.catalog.getDatabase(databaseName) + val dbMetadata: CatalogDatabase = + sqlContext.sessionState.catalog.getDatabaseMetadata(databaseName) val result = Row("Database Name", dbMetadata.name) :: Row("Description", dbMetadata.description) :: @@ -213,7 +214,7 @@ case class AlterTableSetProperties( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) val newProperties = table.properties ++ properties if (DDLUtils.isDatasourceTable(newProperties)) { throw new AnalysisException( @@ -243,7 +244,7 @@ case class AlterTableUnsetProperties( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) if (DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( "alter table properties is not supported for datasource tables") @@ -286,7 +287,7 @@ case class AlterTableSerDeProperties( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) // Do not support setting serde for datasource tables if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { throw new AnalysisException( @@ -376,7 +377,7 @@ case class AlterTableSetLocation( override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val table = catalog.getTable(tableName) + val table = catalog.getTableMetadata(tableName) partitionSpec match { case Some(spec) => // Partition spec is specified, so we set the location only for this partition diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 66d17e322e..c6e601799f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -47,6 +47,7 @@ case class CreateFunction( extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog if (isTemp) { if (databaseName.isDefined) { throw new AnalysisException( @@ -55,24 +56,18 @@ case class CreateFunction( } // We first load resources and then put the builder in the function registry. // Please note that it is allowed to overwrite an existing temp function. - sqlContext.sessionState.catalog.loadFunctionResources(resources) + catalog.loadFunctionResources(resources) val info = new ExpressionInfo(className, functionName) - val builder = - sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, className) - sqlContext.sessionState.catalog.createTempFunction( - functionName, info, builder, ignoreIfExists = false) + val builder = catalog.makeFunctionBuilder(functionName, className) + catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false) } else { // For a permanent, we will store the metadata into underlying external catalog. // This function will be loaded into the FunctionRegistry when a query uses it. // We do not load it into FunctionRegistry right now. - val dbName = databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase) - val func = FunctionIdentifier(functionName, Some(dbName)) - val catalogFunc = CatalogFunction(func, className, resources) - if (sqlContext.sessionState.catalog.functionExists(func)) { - throw new AnalysisException( - s"Function '$functionName' already exists in database '$dbName'.") - } - sqlContext.sessionState.catalog.createFunction(catalogFunc) + // TODO: should we also parse "IF NOT EXISTS"? + catalog.createFunction( + CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources), + ignoreIfExists = false) } Seq.empty[Row] } @@ -101,13 +96,9 @@ case class DropFunction( catalog.dropTempFunction(functionName, ifExists) } else { // We are dropping a permanent function. - val dbName = databaseName.getOrElse(catalog.getCurrentDatabase) - val func = FunctionIdentifier(functionName, Some(dbName)) - if (!ifExists && !catalog.functionExists(func)) { - throw new AnalysisException( - s"Function '$functionName' does not exist in database '$dbName'.") - } - catalog.dropFunction(func) + catalog.dropFunction( + FunctionIdentifier(functionName, databaseName), + ignoreIfNotExists = ifExists) } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a8db4e9923..7084665b3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -88,7 +88,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", @@ -110,7 +110,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { try { val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", @@ -233,14 +233,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val tableIdent = TableIdentifier("tab1", Some("dbx")) createDatabase(catalog, "dbx") createTable(catalog, tableIdent) - assert(catalog.getTable(tableIdent).properties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).properties.isEmpty) // set table properties sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')") - assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel")) + assert(catalog.getTableMetadata(tableIdent).properties == + Map("andrew" -> "or14", "kor" -> "bel")) // set table properties without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')") - assert(catalog.getTable(tableIdent).properties == + assert(catalog.getTableMetadata(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol")) // table to alter does not exist intercept[AnalysisException] { @@ -262,11 +263,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // unset table properties sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')") sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')") - assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan")) + assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" -> "lan")) // unset table properties without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')") - assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan")) + assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan")) // table to alter does not exist intercept[AnalysisException] { sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')") @@ -278,7 +279,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.getMessage.contains("xyz")) // property to unset does not exist, but "IF EXISTS" is specified sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')") - assert(catalog.getTable(tableIdent).properties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).properties.isEmpty) // throw exception for datasource tables convertToDatasourceTable(catalog, tableIdent) val e1 = intercept[AnalysisException] { @@ -393,7 +394,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def convertToDatasourceTable( catalog: SessionCatalog, tableIdent: TableIdentifier): Unit = { - catalog.alterTable(catalog.getTable(tableIdent).copy( + catalog.alterTable(catalog.getTableMetadata(tableIdent).copy( properties = Map("spark.sql.sources.provider" -> "csv"))) } @@ -407,15 +408,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty) - assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty) assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty) // Verify that the location is set to the expected string def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = { val storageFormat = spec .map { s => catalog.getPartition(tableIdent, s).storage } - .getOrElse { catalog.getTable(tableIdent).storage } + .getOrElse { catalog.getTableMetadata(tableIdent).storage } if (isDatasourceTable) { if (spec.isDefined) { assert(storageFormat.serdeProperties.isEmpty) @@ -467,8 +468,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } - assert(catalog.getTable(tableIdent).storage.serde.isEmpty) - assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) // set table serde and/or properties (should fail on datasource tables) if (isDatasourceTable) { val e1 = intercept[AnalysisException] { @@ -482,22 +483,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e2.getMessage.contains("datasource")) } else { sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'") - assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop")) - assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty) + assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop")) + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties.isEmpty) sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " + "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')") - assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop")) - assert(catalog.getTable(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop")) + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == Map("k" -> "v", "kay" -> "vee")) } // set serde properties only sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')") - assert(catalog.getTable(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == Map("k" -> "vvv", "kay" -> "vee")) // set things without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')") - assert(catalog.getTable(tableIdent).storage.serdeProperties == + assert(catalog.getTableMetadata(tableIdent).storage.serdeProperties == Map("k" -> "vvv", "kay" -> "veee")) // table to alter does not exist intercept[AnalysisException] { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 98a5998d03..b1156fb3e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -292,6 +292,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat client.getFunction(db, funcName) } + override def functionExists(db: String, funcName: String): Boolean = withClient { + client.functionExists(db, funcName) + } + override def listFunctions(db: String, pattern: String): Seq[String] = withClient { client.listFunctions(db, pattern) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index ee56f9d75d..94794b1572 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -232,6 +232,11 @@ private[hive] trait HiveClient { /** Return an existing function in the database, or None if it doesn't exist. */ def getFunctionOption(db: String, name: String): Option[CatalogFunction] + /** Return whether a function exists in the specified database. */ + final def functionExists(db: String, name: String): Boolean = { + getFunctionOption(db, name).isDefined + } + /** Return the names of all functions that match the given pattern in the database. */ def listFunctions(db: String, pattern: String): Seq[String] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1f66fbfd85..d0eb9ddf50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -21,7 +21,6 @@ import java.io.{File, PrintStream} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -30,7 +29,8 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException} import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState @@ -559,7 +559,11 @@ private[hive] class HiveClientImpl( override def getFunctionOption( db: String, name: String): Option[CatalogFunction] = withHiveState { - Option(client.getFunction(db, name)).map(fromHiveFunction) + try { + Option(client.getFunction(db, name)).map(fromHiveFunction) + } catch { + case he: HiveException => None + } } override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 6967395613..ada8621d07 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,8 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = + sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +145,8 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) + val hiveTable = + sessionState.catalog.getTableMetadata(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index dd2129375d..c5417b06a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -354,7 +354,7 @@ object PermanentHiveUDFTest2 extends Logging { FunctionIdentifier("example_max"), "org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax", ("JAR" -> jar) :: Nil) - hiveContext.sessionState.catalog.createFunction(function) + hiveContext.sessionState.catalog.createFunction(function, ignoreIfExists = false) val source = hiveContext.createDataFrame((1 to 10).map(i => (i, s"str$i"))).toDF("key", "val") source.registerTempTable("sourceTable") |