From 8e8f898335f5019c0d4f3944c4aefa12a185db70 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 16 Mar 2017 11:34:13 -0700 Subject: [SPARK-19945][SQL] add test suite for SessionCatalog with HiveExternalCatalog ## What changes were proposed in this pull request? Currently `SessionCatalogSuite` is only for `InMemoryCatalog`, there is no suite for `HiveExternalCatalog`. And there are some ddl function is not proper to test in `ExternalCatalogSuite`, because some logic are not full implement in `ExternalCatalog`, these ddl functions are full implement in `SessionCatalog`(e.g. merge the same logic from `ExternalCatalog` up to `SessionCatalog` ). It is better to test it in `SessionCatalogSuite` for this situation. So we should add a test suite for `SessionCatalog` with `HiveExternalCatalog` The main change is that in `SessionCatalogSuite` add two functions: `withBasicCatalog` and `withEmptyCatalog` And replace the code like `val catalog = new SessionCatalog(newBasicCatalog)` with above two functions ## How was this patch tested? add `HiveExternalSessionCatalogSuite` Author: windpiger Closes #17287 from windpiger/sessioncatalogsuit. --- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 1907 +++++++++++--------- .../sql/hive/HiveExternalSessionCatalogSuite.scala | 40 + 3 files changed, 1049 insertions(+), 900 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalSessionCatalogSuite.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index bfcdb70fe4..25aa8d3ba9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -48,7 +48,7 @@ object SessionCatalog { * This class must be thread-safe. */ class SessionCatalog( - externalCatalog: ExternalCatalog, + val externalCatalog: ExternalCatalog, globalTempViewManager: GlobalTempViewManager, functionRegistry: FunctionRegistry, conf: CatalystConf, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 7e74dcdef0..bb87763e0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -27,41 +27,67 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +class InMemorySessionCatalogSuite extends SessionCatalogSuite { + protected val utils = new CatalogTestUtils { + override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat" + override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat" + override val defaultProvider: String = "parquet" + override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog + } +} + /** - * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented. + * Tests for [[SessionCatalog]] * * Note: many of the methods here are very similar to the ones in [[ExternalCatalogSuite]]. * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. */ -class SessionCatalogSuite extends PlanTest { - private val utils = new CatalogTestUtils { - override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat" - override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat" - override val defaultProvider: String = "parquet" - override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog - } +abstract class SessionCatalogSuite extends PlanTest { + protected val utils: CatalogTestUtils + + protected val isHiveExternalCatalog = false import utils._ + private def withBasicCatalog(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newBasicCatalog()) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + try { + f(catalog) + } finally { + catalog.reset() + } + } + + private def withEmptyCatalog(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newEmptyCatalog()) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + try { + f(catalog) + } finally { + catalog.reset() + } + } // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- test("basic create and list databases") { - val catalog = new SessionCatalog(newEmptyCatalog()) - catalog.createDatabase(newDb("default"), ignoreIfExists = true) - assert(catalog.databaseExists("default")) - assert(!catalog.databaseExists("testing")) - assert(!catalog.databaseExists("testing2")) - catalog.createDatabase(newDb("testing"), ignoreIfExists = false) - assert(catalog.databaseExists("testing")) - assert(catalog.listDatabases().toSet == Set("default", "testing")) - catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) - assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) - assert(catalog.databaseExists("testing2")) - assert(!catalog.databaseExists("does_not_exist")) + withEmptyCatalog { catalog => + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + assert(catalog.databaseExists("default")) + assert(!catalog.databaseExists("testing")) + assert(!catalog.databaseExists("testing2")) + catalog.createDatabase(newDb("testing"), ignoreIfExists = false) + assert(catalog.databaseExists("testing")) + assert(catalog.listDatabases().toSet == Set("default", "testing")) + catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) + assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) + assert(catalog.databaseExists("testing2")) + assert(!catalog.databaseExists("does_not_exist")) + } } def testInvalidName(func: (String) => Unit) { @@ -76,121 +102,141 @@ class SessionCatalogSuite extends PlanTest { } test("create databases using invalid names") { - val catalog = new SessionCatalog(newEmptyCatalog()) - testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true)) + withEmptyCatalog { catalog => + testInvalidName( + name => catalog.createDatabase(newDb(name), ignoreIfExists = true)) + } } test("get database when a database exists") { - val catalog = new SessionCatalog(newBasicCatalog()) - val db1 = catalog.getDatabaseMetadata("db1") - assert(db1.name == "db1") - assert(db1.description.contains("db1")) + withBasicCatalog { catalog => + val db1 = catalog.getDatabaseMetadata("db1") + assert(db1.name == "db1") + assert(db1.description.contains("db1")) + } } test("get database should throw exception when the database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.getDatabaseMetadata("db_that_does_not_exist") + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.getDatabaseMetadata("db_that_does_not_exist") + } } } test("list databases without pattern") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3")) + withBasicCatalog { catalog => + assert(catalog.listDatabases().toSet == Set("default", "db1", "db2", "db3")) + } } test("list databases with pattern") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.listDatabases("db").toSet == Set.empty) - assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3")) - assert(catalog.listDatabases("*1").toSet == Set("db1")) - assert(catalog.listDatabases("db2").toSet == Set("db2")) + withBasicCatalog { catalog => + assert(catalog.listDatabases("db").toSet == Set.empty) + assert(catalog.listDatabases("db*").toSet == Set("db1", "db2", "db3")) + assert(catalog.listDatabases("*1").toSet == Set("db1")) + assert(catalog.listDatabases("db2").toSet == Set("db2")) + } } test("drop database") { - val catalog = new SessionCatalog(newBasicCatalog()) - catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) - assert(catalog.listDatabases().toSet == Set("default", "db2", "db3")) + withBasicCatalog { catalog => + catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) + assert(catalog.listDatabases().toSet == Set("default", "db2", "db3")) + } } test("drop database when the database is not empty") { // Throw exception if there are functions left - val externalCatalog1 = newBasicCatalog() - val sessionCatalog1 = new SessionCatalog(externalCatalog1) - externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false) - externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false) - intercept[AnalysisException] { - sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + withBasicCatalog { catalog => + catalog.externalCatalog.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false) + catalog.externalCatalog.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false) + intercept[AnalysisException] { + catalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } } - - // Throw exception if there are tables left - val externalCatalog2 = newBasicCatalog() - val sessionCatalog2 = new SessionCatalog(externalCatalog2) - externalCatalog2.dropFunction("db2", "func1") - intercept[AnalysisException] { - sessionCatalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + withBasicCatalog { catalog => + // Throw exception if there are tables left + catalog.externalCatalog.dropFunction("db2", "func1") + intercept[AnalysisException] { + catalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } } - // When cascade is true, it should drop them - val externalCatalog3 = newBasicCatalog() - val sessionCatalog3 = new SessionCatalog(externalCatalog3) - externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) - assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1", "db3")) + withBasicCatalog { catalog => + // When cascade is true, it should drop them + catalog.externalCatalog.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) + assert(catalog.listDatabases().toSet == Set("default", "db1", "db3")) + } } test("drop database when the database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + withBasicCatalog { catalog => + // TODO: fix this inconsistent between HiveExternalCatalog and InMemoryCatalog + if (isHiveExternalCatalog) { + val e = intercept[AnalysisException] { + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + }.getMessage + assert(e.contains( + "org.apache.hadoop.hive.metastore.api.NoSuchObjectException: db_that_does_not_exist")) + } else { + intercept[NoSuchDatabaseException] { + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + } + } + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) } - catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) } test("drop current database and drop default database") { - val catalog = new SessionCatalog(newBasicCatalog()) - catalog.setCurrentDatabase("db1") - assert(catalog.getCurrentDatabase == "db1") - catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = true) - intercept[NoSuchDatabaseException] { - catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false) - } - catalog.setCurrentDatabase("default") - assert(catalog.getCurrentDatabase == "default") - intercept[AnalysisException] { - catalog.dropDatabase("default", ignoreIfNotExists = false, cascade = true) + withBasicCatalog { catalog => + catalog.setCurrentDatabase("db1") + assert(catalog.getCurrentDatabase == "db1") + catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = true) + intercept[NoSuchDatabaseException] { + catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false) + } + catalog.setCurrentDatabase("default") + assert(catalog.getCurrentDatabase == "default") + intercept[AnalysisException] { + catalog.dropDatabase("default", ignoreIfNotExists = false, cascade = true) + } } } test("alter database") { - val catalog = new SessionCatalog(newBasicCatalog()) - val db1 = catalog.getDatabaseMetadata("db1") - // Note: alter properties here because Hive does not support altering other fields - catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) - val newDb1 = catalog.getDatabaseMetadata("db1") - assert(db1.properties.isEmpty) - assert(newDb1.properties.size == 2) - assert(newDb1.properties.get("k") == Some("v3")) - assert(newDb1.properties.get("good") == Some("true")) + withBasicCatalog { catalog => + val db1 = catalog.getDatabaseMetadata("db1") + // Note: alter properties here because Hive does not support altering other fields + catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) + val newDb1 = catalog.getDatabaseMetadata("db1") + assert(db1.properties.isEmpty) + assert(newDb1.properties.size == 2) + assert(newDb1.properties.get("k") == Some("v3")) + assert(newDb1.properties.get("good") == Some("true")) + } } test("alter database should throw exception when the database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.alterDatabase(newDb("unknown_db")) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.alterDatabase(newDb("unknown_db")) + } } } test("get/set current database") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.getCurrentDatabase == "default") - catalog.setCurrentDatabase("db2") - assert(catalog.getCurrentDatabase == "db2") - intercept[NoSuchDatabaseException] { + withBasicCatalog { catalog => + assert(catalog.getCurrentDatabase == "default") + catalog.setCurrentDatabase("db2") + assert(catalog.getCurrentDatabase == "db2") + intercept[NoSuchDatabaseException] { + catalog.setCurrentDatabase("deebo") + } + catalog.createDatabase(newDb("deebo"), ignoreIfExists = false) catalog.setCurrentDatabase("deebo") + assert(catalog.getCurrentDatabase == "deebo") } - catalog.createDatabase(newDb("deebo"), ignoreIfExists = false) - catalog.setCurrentDatabase("deebo") - assert(catalog.getCurrentDatabase == "deebo") } // -------------------------------------------------------------------------- @@ -198,346 +244,360 @@ class SessionCatalogSuite extends PlanTest { // -------------------------------------------------------------------------- test("create table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(externalCatalog.listTables("db1").isEmpty) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - sessionCatalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false) - sessionCatalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false) - assert(externalCatalog.listTables("db1").toSet == Set("tbl3")) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) - // Create table without explicitly specifying database - sessionCatalog.setCurrentDatabase("db1") - sessionCatalog.createTable(newTable("tbl4"), ignoreIfExists = false) - assert(externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4")) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) + withBasicCatalog { catalog => + assert(catalog.externalCatalog.listTables("db1").isEmpty) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false) + catalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false) + assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3")) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) + // Create table without explicitly specifying database + catalog.setCurrentDatabase("db1") + catalog.createTable(newTable("tbl4"), ignoreIfExists = false) + assert(catalog.externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4")) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) + } } test("create tables using invalid names") { - val catalog = new SessionCatalog(newEmptyCatalog()) - testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false)) + withEmptyCatalog { catalog => + testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false)) + } } test("create table when database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - // Creating table in non-existent database should always fail - intercept[NoSuchDatabaseException] { - catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false) - } - intercept[NoSuchDatabaseException] { - catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true) - } - // Table already exists - intercept[TableAlreadyExistsException] { - catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) + withBasicCatalog { catalog => + // Creating table in non-existent database should always fail + intercept[NoSuchDatabaseException] { + catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false) + } + intercept[NoSuchDatabaseException] { + catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true) + } + // Table already exists + intercept[TableAlreadyExistsException] { + catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) + } + catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true) } - catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true) } test("create temp table") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable1 = Range(1, 10, 1, 10) - val tempTable2 = Range(1, 20, 2, 10) - catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) - catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) - assert(catalog.getTempView("tbl1") == Option(tempTable1)) - assert(catalog.getTempView("tbl2") == Option(tempTable2)) - assert(catalog.getTempView("tbl3").isEmpty) - // Temporary table already exists - intercept[TempTableAlreadyExistsException] { + withBasicCatalog { catalog => + val tempTable1 = Range(1, 10, 1, 10) + val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) + catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) + assert(catalog.getTempView("tbl1") == Option(tempTable1)) + assert(catalog.getTempView("tbl2") == Option(tempTable2)) + assert(catalog.getTempView("tbl3").isEmpty) + // Temporary table already exists + intercept[TempTableAlreadyExistsException] { + catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) + } + // Temporary table already exists but we override it + catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) + assert(catalog.getTempView("tbl1") == Option(tempTable2)) } - // Temporary table already exists but we override it - catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) - assert(catalog.getTempView("tbl1") == Option(tempTable2)) } test("drop table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, - purge = false) - assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) - // Drop table without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false) - assert(externalCatalog.listTables("db2").isEmpty) + withBasicCatalog { catalog => + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, + purge = false) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2")) + // Drop table without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false) + assert(catalog.externalCatalog.listTables("db2").isEmpty) + } } test("drop table when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - // Should always throw exception when the database does not exist - intercept[NoSuchDatabaseException] { - catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false, - purge = false) - } - intercept[NoSuchDatabaseException] { - catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true, - purge = false) - } - intercept[NoSuchTableException] { - catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false, + withBasicCatalog { catalog => + // Should always throw exception when the database does not exist + intercept[NoSuchDatabaseException] { + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false, + purge = false) + } + intercept[NoSuchDatabaseException] { + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true, + purge = false) + } + intercept[NoSuchTableException] { + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false, + purge = false) + } + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true, purge = false) } - catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true, - purge = false) } test("drop temp table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10) - sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) - sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - // If database is not specified, temp table should be dropped first - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(sessionCatalog.getTempView("tbl1") == None) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - // If temp table does not exist, the table in the current database should be dropped - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) - // If database is specified, temp tables are never dropped - sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) - sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) - sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, - purge = false) - assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) - assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) + withBasicCatalog { catalog => + val tempTable = Range(1, 10, 2, 10) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.setCurrentDatabase("db2") + assert(catalog.getTempView("tbl1") == Some(tempTable)) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is not specified, temp table should be dropped first + catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) + assert(catalog.getTempView("tbl1") == None) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If temp table does not exist, the table in the current database should be dropped + catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2")) + // If database is specified, temp tables are never dropped + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) + catalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, + purge = false) + assert(catalog.getTempView("tbl1") == Some(tempTable)) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl2")) + } } test("rename table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - sessionCatalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone")) - assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2")) - sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo")) - assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo")) - // Rename table without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two")) - assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two")) - // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match - intercept[AnalysisException] { - sessionCatalog.renameTable( - TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1"))) - } - // The new table already exists - intercept[TableAlreadyExistsException] { - sessionCatalog.renameTable( - TableIdentifier("tblone", Some("db2")), - TableIdentifier("table_two")) + withBasicCatalog { catalog => + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone")) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2")) + catalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo")) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo")) + // Rename table without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two")) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tblone", "table_two")) + // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match + intercept[AnalysisException] { + catalog.renameTable( + TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1"))) + } + // The new table already exists + intercept[TableAlreadyExistsException] { + catalog.renameTable( + TableIdentifier("tblone", Some("db2")), + TableIdentifier("table_two")) + } } } test("rename tables to an invalid name") { - val catalog = new SessionCatalog(newBasicCatalog()) - testInvalidName( - name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name))) + withBasicCatalog { catalog => + testInvalidName( + name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name))) + } } test("rename table when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2")) - } - intercept[NoSuchTableException] { - catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2")) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.renameTable(TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2")) + } + intercept[NoSuchTableException] { + catalog.renameTable(TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2")) + } } } test("rename temp table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10) - sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) - sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempView("tbl1") == Option(tempTable)) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - // If database is not specified, temp table should be renamed first - sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) - assert(sessionCatalog.getTempView("tbl1").isEmpty) - assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - // If database is specified, temp tables are never renamed - sessionCatalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4")) - assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) - assert(sessionCatalog.getTempView("tbl4").isEmpty) - assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) + withBasicCatalog { catalog => + val tempTable = Range(1, 10, 2, 10) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.setCurrentDatabase("db2") + assert(catalog.getTempView("tbl1") == Option(tempTable)) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is not specified, temp table should be renamed first + catalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) + assert(catalog.getTempView("tbl1").isEmpty) + assert(catalog.getTempView("tbl3") == Option(tempTable)) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is specified, temp tables are never renamed + catalog.renameTable(TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4")) + assert(catalog.getTempView("tbl3") == Option(tempTable)) + assert(catalog.getTempView("tbl4").isEmpty) + assert(catalog.externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) + } } test("alter table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - val tbl1 = externalCatalog.getTable("db2", "tbl1") - sessionCatalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem"))) - val newTbl1 = externalCatalog.getTable("db2", "tbl1") - assert(!tbl1.properties.contains("toh")) - assert(newTbl1.properties.size == tbl1.properties.size + 1) - assert(newTbl1.properties.get("toh") == Some("frem")) - // Alter table without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1"))) - val newestTbl1 = externalCatalog.getTable("db2", "tbl1") - assert(newestTbl1 == tbl1) + withBasicCatalog { catalog => + val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1") + catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem"))) + val newTbl1 = catalog.externalCatalog.getTable("db2", "tbl1") + assert(!tbl1.properties.contains("toh")) + assert(newTbl1.properties.size == tbl1.properties.size + 1) + assert(newTbl1.properties.get("toh") == Some("frem")) + // Alter table without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1"))) + val newestTbl1 = catalog.externalCatalog.getTable("db2", "tbl1") + // For hive serde table, hive metastore will set transient_lastDdlTime in table's properties, + // and its value will be modified, here we ignore it when comparing the two tables. + assert(newestTbl1.copy(properties = Map.empty) == tbl1.copy(properties = Map.empty)) + } } test("alter table when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.alterTable(newTable("tbl1", "unknown_db")) - } - intercept[NoSuchTableException] { - catalog.alterTable(newTable("unknown_table", "db2")) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.alterTable(newTable("tbl1", "unknown_db")) + } + intercept[NoSuchTableException] { + catalog.alterTable(newTable("unknown_table", "db2")) + } } } test("get table") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) - == externalCatalog.getTable("db2", "tbl1")) - // Get table without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTableMetadata(TableIdentifier("tbl1")) - == externalCatalog.getTable("db2", "tbl1")) + withBasicCatalog { catalog => + assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) + == catalog.externalCatalog.getTable("db2", "tbl1")) + // Get table without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.getTableMetadata(TableIdentifier("tbl1")) + == catalog.externalCatalog.getTable("db2", "tbl1")) + } } test("get table when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db"))) - } - intercept[NoSuchTableException] { - catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2"))) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db"))) + } + intercept[NoSuchTableException] { + catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2"))) + } } } test("get option of table metadata") { - val externalCatalog = newBasicCatalog() - val catalog = new SessionCatalog(externalCatalog) - assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2"))) - == Option(externalCatalog.getTable("db2", "tbl1"))) - assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty) - intercept[NoSuchDatabaseException] { - catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db"))) + withBasicCatalog { catalog => + assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2"))) + == Option(catalog.externalCatalog.getTable("db2", "tbl1"))) + assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty) + intercept[NoSuchDatabaseException] { + catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db"))) + } } } test("lookup table relation") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable1 = Range(1, 10, 1, 10) - val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") - sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) - sessionCatalog.setCurrentDatabase("db2") - // If we explicitly specify the database, we'll look up the relation in that database - assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) - // Otherwise, we'll first look up a temporary table with the same name - assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1)) - // Then, if that does not exist, look up the relation in the current database - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + withBasicCatalog { catalog => + val tempTable1 = Range(1, 10, 1, 10) + val metastoreTable1 = catalog.externalCatalog.getTable("db2", "tbl1") + catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) + catalog.setCurrentDatabase("db2") + // If we explicitly specify the database, we'll look up the relation in that database + assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head + .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + // Otherwise, we'll first look up a temporary table with the same name + assert(catalog.lookupRelation(TableIdentifier("tbl1")) + == SubqueryAlias("tbl1", tempTable1)) + // Then, if that does not exist, look up the relation in the current database + catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) + assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head + .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + } } test("look up view relation") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - val metadata = externalCatalog.getTable("db3", "view1") - sessionCatalog.setCurrentDatabase("default") - // Look up a view. - assert(metadata.viewText.isDefined) - val view = View(desc = metadata, output = metadata.schema.toAttributes, - child = CatalystSqlParser.parsePlan(metadata.viewText.get)) - comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1", Some("db3"))), - SubqueryAlias("view1", view)) - // Look up a view using current database of the session catalog. - sessionCatalog.setCurrentDatabase("db3") - comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")), - SubqueryAlias("view1", view)) + withBasicCatalog { catalog => + val metadata = catalog.externalCatalog.getTable("db3", "view1") + catalog.setCurrentDatabase("default") + // Look up a view. + assert(metadata.viewText.isDefined) + val view = View(desc = metadata, output = metadata.schema.toAttributes, + child = CatalystSqlParser.parsePlan(metadata.viewText.get)) + comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))), + SubqueryAlias("view1", view)) + // Look up a view using current database of the session catalog. + catalog.setCurrentDatabase("db3") + comparePlans(catalog.lookupRelation(TableIdentifier("view1")), + SubqueryAlias("view1", view)) + } } test("table exists") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) - assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) - assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) - // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10) - assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) - // If database is not explicitly specified, check the current database - catalog.setCurrentDatabase("db2") - assert(catalog.tableExists(TableIdentifier("tbl1"))) - assert(catalog.tableExists(TableIdentifier("tbl2"))) - - catalog.createTempView("tbl3", tempTable, overrideIfExists = false) - // tableExists should not check temp view. - assert(!catalog.tableExists(TableIdentifier("tbl3"))) + withBasicCatalog { catalog => + assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) + assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) + assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) + // If database is explicitly specified, do not check temporary tables + val tempTable = Range(1, 10, 1, 10) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + // If database is not explicitly specified, check the current database + catalog.setCurrentDatabase("db2") + assert(catalog.tableExists(TableIdentifier("tbl1"))) + assert(catalog.tableExists(TableIdentifier("tbl2"))) + + catalog.createTempView("tbl3", tempTable, overrideIfExists = false) + // tableExists should not check temp view. + assert(!catalog.tableExists(TableIdentifier("tbl3"))) + } } test("getTempViewOrPermanentTableMetadata on temporary views") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10) - intercept[NoSuchTableException] { - catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1")) - }.getMessage + withBasicCatalog { catalog => + val tempTable = Range(1, 10, 2, 10) + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1")) + }.getMessage - intercept[NoSuchTableException] { - catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) - }.getMessage + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) + }.getMessage - catalog.createTempView("view1", tempTable, overrideIfExists = false) - assert(catalog.getTempViewOrPermanentTableMetadata( - TableIdentifier("view1")).identifier.table == "view1") - assert(catalog.getTempViewOrPermanentTableMetadata( - TableIdentifier("view1")).schema(0).name == "id") + catalog.createTempView("view1", tempTable, overrideIfExists = false) + assert(catalog.getTempViewOrPermanentTableMetadata( + TableIdentifier("view1")).identifier.table == "view1") + assert(catalog.getTempViewOrPermanentTableMetadata( + TableIdentifier("view1")).schema(0).name == "id") - intercept[NoSuchTableException] { - catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) - }.getMessage + intercept[NoSuchTableException] { + catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default"))) + }.getMessage + } } test("list tables without pattern") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10) - catalog.createTempView("tbl1", tempTable, overrideIfExists = false) - catalog.createTempView("tbl4", tempTable, overrideIfExists = false) - assert(catalog.listTables("db1").toSet == - Set(TableIdentifier("tbl1"), TableIdentifier("tbl4"))) - assert(catalog.listTables("db2").toSet == - Set(TableIdentifier("tbl1"), - TableIdentifier("tbl4"), - TableIdentifier("tbl1", Some("db2")), - TableIdentifier("tbl2", Some("db2")))) - intercept[NoSuchDatabaseException] { - catalog.listTables("unknown_db") + withBasicCatalog { catalog => + val tempTable = Range(1, 10, 2, 10) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.createTempView("tbl4", tempTable, overrideIfExists = false) + assert(catalog.listTables("db1").toSet == + Set(TableIdentifier("tbl1"), TableIdentifier("tbl4"))) + assert(catalog.listTables("db2").toSet == + Set(TableIdentifier("tbl1"), + TableIdentifier("tbl4"), + TableIdentifier("tbl1", Some("db2")), + TableIdentifier("tbl2", Some("db2")))) + intercept[NoSuchDatabaseException] { + catalog.listTables("unknown_db") + } } } test("list tables with pattern") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10) - catalog.createTempView("tbl1", tempTable, overrideIfExists = false) - catalog.createTempView("tbl4", tempTable, overrideIfExists = false) - assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) - assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet) - assert(catalog.listTables("db2", "tbl*").toSet == - Set(TableIdentifier("tbl1"), - TableIdentifier("tbl4"), - TableIdentifier("tbl1", Some("db2")), - TableIdentifier("tbl2", Some("db2")))) - assert(catalog.listTables("db2", "*1").toSet == - Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) - intercept[NoSuchDatabaseException] { - catalog.listTables("unknown_db", "*") + withBasicCatalog { catalog => + val tempTable = Range(1, 10, 2, 10) + catalog.createTempView("tbl1", tempTable, overrideIfExists = false) + catalog.createTempView("tbl4", tempTable, overrideIfExists = false) + assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) + assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet) + assert(catalog.listTables("db2", "tbl*").toSet == + Set(TableIdentifier("tbl1"), + TableIdentifier("tbl4"), + TableIdentifier("tbl1", Some("db2")), + TableIdentifier("tbl2", Some("db2")))) + assert(catalog.listTables("db2", "*1").toSet == + Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) + intercept[NoSuchDatabaseException] { + catalog.listTables("unknown_db", "*") + } } } @@ -546,451 +606,477 @@ class SessionCatalogSuite extends PlanTest { // -------------------------------------------------------------------------- test("basic create and list partitions") { - val externalCatalog = newEmptyCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false) - sessionCatalog.createPartitions( - TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual(externalCatalog.listPartitions("mydb", "tbl"), part1, part2)) - // Create partitions without explicitly specifying database - sessionCatalog.setCurrentDatabase("mydb") - sessionCatalog.createPartitions( - TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false) - assert(catalogPartitionsEqual( - externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder)) + withEmptyCatalog { catalog => + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false) + catalog.createPartitions( + TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions("mydb", "tbl"), part1, part2)) + // Create partitions without explicitly specifying database + catalog.setCurrentDatabase("mydb") + catalog.createPartitions( + TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false) + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions("mydb", "tbl"), part1, part2, partWithMixedOrder)) + } } test("create partitions when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.createPartitions( - TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false) - } - intercept[NoSuchTableException] { - catalog.createPartitions( - TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.createPartitions( + TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false) + } + intercept[NoSuchTableException] { + catalog.createPartitions( + TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false) + } } } test("create partitions that already exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + withBasicCatalog { catalog => + intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false) + } catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false) + TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true) } - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true) } test("create partitions with invalid part spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1, partWithLessColumns), ignoreIfExists = false) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1, partWithMoreColumns), ignoreIfExists = true) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithUnknownColumns, part1), ignoreIfExists = true) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithEmptyValue, part1), ignoreIfExists = true) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part1, partWithLessColumns), ignoreIfExists = false) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part1, partWithMoreColumns), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithUnknownColumns, part1), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithEmptyValue, part1), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("drop partitions") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) - sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2)) - // Drop partitions without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.dropPartitions( - TableIdentifier("tbl2"), - Seq(part2.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) - // Drop multiple partitions at once - sessionCatalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) - sessionCatalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1.spec, part2.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) - } - - test("drop partitions when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { + withBasicCatalog { catalog => + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) catalog.dropPartitions( - TableIdentifier("tbl1", Some("unknown_db")), - Seq(), + TableIdentifier("tbl2", Some("db2")), + Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) - } - intercept[NoSuchTableException] { + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions("db2", "tbl2"), part2)) + // Drop partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") catalog.dropPartitions( - TableIdentifier("does_not_exist", Some("db2")), - Seq(), + TableIdentifier("tbl2"), + Seq(part2.spec), ignoreIfNotExists = false, purge = false, retainData = false) - } - } - - test("drop partitions that do not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty) + // Drop multiple partitions at once + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), - Seq(part3.spec), + Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false, retainData = false) + assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty) } - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part3.spec), - ignoreIfNotExists = true, - purge = false, - retainData = false) } - test("drop partitions with invalid partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithMoreColumns.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) + test("drop partitions when database/table does not exist") { + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.dropPartitions( + TableIdentifier("tbl1", Some("unknown_db")), + Seq(), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } + intercept[NoSuchTableException] { + catalog.dropPartitions( + TableIdentifier("does_not_exist", Some("db2")), + Seq(), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } } - assert(e.getMessage.contains( - "Partition spec is invalid. The spec (a, b, c) must be contained within " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { + } + + test("drop partitions that do not exist") { + withBasicCatalog { catalog => + intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), - Seq(partWithUnknownColumns.spec), - ignoreIfNotExists = false, + Seq(part3.spec), + ignoreIfNotExists = true, purge = false, retainData = false) } - assert(e.getMessage.contains( - "Partition spec is invalid. The spec (a, unknown) must be contained within " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithEmptyValue.spec, part1.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) + } + + test("drop partitions with invalid partition spec") { + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithMoreColumns.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, b, c) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithUnknownColumns.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, unknown) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithEmptyValue.spec, part1.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("get partition") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalog.getPartition( - TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec) - assert(catalog.getPartition( - TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec) - // Get partition without explicitly specifying database - catalog.setCurrentDatabase("db2") - assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec) - assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec) - // Get non-existent partition - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl2"), part3.spec) + withBasicCatalog { catalog => + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec) + // Get partition without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec) + assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec) + // Get non-existent partition + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), part3.spec) + } } } test("get partition when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec) - } - intercept[NoSuchTableException] { - catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec) + } + intercept[NoSuchTableException] { + catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec) + } } } test("get partition with invalid partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("rename partitions") { - val catalog = new SessionCatalog(newBasicCatalog()) - val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) - val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) - val newSpecs = Seq(newPart1.spec, newPart2.spec) - catalog.renamePartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs) - assert(catalog.getPartition( - TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec) - assert(catalog.getPartition( - TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec) - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) - } - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) - } - // Rename partitions without explicitly specifying database - catalog.setCurrentDatabase("db2") - catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec)) - assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec) - assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec) - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec) - } - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec) + withBasicCatalog { catalog => + val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) + val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) + val newSpecs = Seq(newPart1.spec, newPart2.spec) + catalog.renamePartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec) + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + } + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + } + // Rename partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec)) + assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec) + assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec) + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec) + } + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec) + } } } test("rename partitions when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.renamePartitions( - TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec)) - } - intercept[NoSuchTableException] { - catalog.renamePartitions( - TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec)) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec)) + } + intercept[NoSuchTableException] { + catalog.renamePartitions( + TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec)) + } } } test("rename partition with invalid partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.renamePartitions( - TableIdentifier("tbl1", Some("db2")), - Seq(part1.spec), Seq(partWithLessColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.renamePartitions( - TableIdentifier("tbl1", Some("db2")), - Seq(part1.spec), Seq(partWithMoreColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.renamePartitions( - TableIdentifier("tbl1", Some("db2")), - Seq(part1.spec), Seq(partWithUnknownColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.renamePartitions( - TableIdentifier("tbl1", Some("db2")), - Seq(part1.spec), Seq(partWithEmptyValue.spec)) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithLessColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithEmptyValue.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("alter partitions") { - val catalog = new SessionCatalog(newBasicCatalog()) - val newLocation = newUriForDatabase() - // Alter but keep spec the same - val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) - val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) - catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) - val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) - val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) - assert(newPart1.storage.locationUri == Some(newLocation)) - assert(newPart2.storage.locationUri == Some(newLocation)) - assert(oldPart1.storage.locationUri != Some(newLocation)) - assert(oldPart2.storage.locationUri != Some(newLocation)) - // Alter partitions without explicitly specifying database - catalog.setCurrentDatabase("db2") - catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2)) - val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec) - val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec) - assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri) - assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri) - // Alter but change spec, should fail because new partition specs do not exist yet - val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) - val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) - intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2)) + withBasicCatalog { catalog => + val newLocation = newUriForDatabase() + // Alter but keep spec the same + val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq( + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + assert(newPart1.storage.locationUri == Some(newLocation)) + assert(newPart2.storage.locationUri == Some(newLocation)) + assert(oldPart1.storage.locationUri != Some(newLocation)) + assert(oldPart2.storage.locationUri != Some(newLocation)) + // Alter partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2)) + val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec) + val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec) + assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri) + assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri) + // Alter but change spec, should fail because new partition specs do not exist yet + val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) + val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) + intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2)) + } } } test("alter partitions when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1)) - } - intercept[NoSuchTableException] { - catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1)) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1)) + } + intercept[NoSuchTableException] { + catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1)) + } } } test("alter partition with invalid partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + - "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) - e = intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue)) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("list partition names") { - val catalog = new SessionCatalog(newBasicCatalog()) - val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4") - assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) == - expectedPartitionNames) - // List partition names without explicitly specifying database - catalog.setCurrentDatabase("db2") - assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames) + withBasicCatalog { catalog => + val expectedPartitionNames = Seq("a=1/b=2", "a=3/b=4") + assert(catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2"))) == + expectedPartitionNames) + // List partition names without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.listPartitionNames(TableIdentifier("tbl2")) == expectedPartitionNames) + } } test("list partition names with partial partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert( - catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) == - Seq("a=1/b=2")) + withBasicCatalog { catalog => + assert( + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))) == + Seq("a=1/b=2")) + } } test("list partition names with invalid partial partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), - Some(partWithMoreColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + - "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), - Some(partWithUnknownColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + - "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), - Some(partWithEmptyValue.spec)) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")), + Some(partWithEmptyValue.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("list partitions") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalogPartitionsEqual( - catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2)) - // List partitions without explicitly specifying database - catalog.setCurrentDatabase("db2") - assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) + withBasicCatalog { catalog => + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))), part1, part2)) + // List partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalogPartitionsEqual(catalog.listPartitions(TableIdentifier("tbl2")), part1, part2)) + } } test("list partitions with partial partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - assert(catalogPartitionsEqual( - catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1)) + withBasicCatalog { catalog => + assert(catalogPartitionsEqual( + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(Map("a" -> "1"))), part1)) + } } test("list partitions with invalid partial partition spec") { - val catalog = new SessionCatalog(newBasicCatalog()) - var e = intercept[AnalysisException] { - catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + - "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), - Some(partWithUnknownColumns.spec)) - } - assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + - "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec)) + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), + Some(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " + + "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + + "empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } test("list partitions when database/table does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db"))) - } - intercept[NoSuchTableException] { - catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2"))) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db"))) + } + intercept[NoSuchTableException] { + catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2"))) + } } } @@ -999,8 +1085,17 @@ class SessionCatalogSuite extends PlanTest { expectedParts: CatalogTablePartition*): Boolean = { // ExternalCatalog may set a default location for partitions, here we ignore the partition // location when comparing them. - actualParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet == - expectedParts.map(p => p.copy(storage = p.storage.copy(locationUri = None))).toSet + // And for hive serde table, hive metastore will set some values(e.g.transient_lastDdlTime) + // in table's parameters and storage's properties, here we also ignore them. + val actualPartsNormalize = actualParts.map(p => + p.copy(parameters = Map.empty, storage = p.storage.copy( + properties = Map.empty, locationUri = None, serde = None))).toSet + + val expectedPartsNormalize = expectedParts.map(p => + p.copy(parameters = Map.empty, storage = p.storage.copy( + properties = Map.empty, locationUri = None, serde = None))).toSet + + actualPartsNormalize == expectedPartsNormalize } // -------------------------------------------------------------------------- @@ -1008,248 +1103,258 @@ class SessionCatalogSuite extends PlanTest { // -------------------------------------------------------------------------- test("basic create and list functions") { - val externalCatalog = newEmptyCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false) - sessionCatalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false) - assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) - // Create function without explicitly specifying database - sessionCatalog.setCurrentDatabase("mydb") - sessionCatalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false) - assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) + withEmptyCatalog { catalog => + catalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + catalog.createFunction(newFunc("myfunc", Some("mydb")), ignoreIfExists = false) + assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) + // Create function without explicitly specifying database + catalog.setCurrentDatabase("mydb") + catalog.createFunction(newFunc("myfunc2"), ignoreIfExists = false) + assert(catalog.externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) + } } test("create function when database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.createFunction( - newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.createFunction( + newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) + } } } test("create function that already exists") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[FunctionAlreadyExistsException] { - catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) + withBasicCatalog { catalog => + intercept[FunctionAlreadyExistsException] { + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) + } + catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) } - catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) } test("create temp function") { - val catalog = new SessionCatalog(newBasicCatalog()) - val tempFunc1 = (e: Seq[Expression]) => e.head - val tempFunc2 = (e: Seq[Expression]) => e.last - val info1 = new ExpressionInfo("tempFunc1", "temp1") - val info2 = new ExpressionInfo("tempFunc2", "temp2") - catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) - catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false) - val arguments = Seq(Literal(1), Literal(2), Literal(3)) - assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1)) - assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3)) - // Temporary function does not exist. - intercept[NoSuchFunctionException] { - catalog.lookupFunction(FunctionIdentifier("temp3"), arguments) - } - val tempFunc3 = (e: Seq[Expression]) => Literal(e.size) - val info3 = new ExpressionInfo("tempFunc3", "temp1") - // Temporary function already exists - intercept[TempFunctionAlreadyExistsException] { - catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false) - } - // Temporary function is overridden - catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true) - assert( - catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(arguments.length)) + withBasicCatalog { catalog => + val tempFunc1 = (e: Seq[Expression]) => e.head + val tempFunc2 = (e: Seq[Expression]) => e.last + val info1 = new ExpressionInfo("tempFunc1", "temp1") + val info2 = new ExpressionInfo("tempFunc2", "temp2") + catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) + catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false) + val arguments = Seq(Literal(1), Literal(2), Literal(3)) + assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1)) + assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3)) + // Temporary function does not exist. + intercept[NoSuchFunctionException] { + catalog.lookupFunction(FunctionIdentifier("temp3"), arguments) + } + val tempFunc3 = (e: Seq[Expression]) => Literal(e.size) + val info3 = new ExpressionInfo("tempFunc3", "temp1") + // Temporary function already exists + intercept[TempFunctionAlreadyExistsException] { + catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false) + } + // Temporary function is overridden + catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true) + assert( + catalog.lookupFunction( + FunctionIdentifier("temp1"), arguments) === Literal(arguments.length)) + } } test("isTemporaryFunction") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - - // Returns false when the function does not exist - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1"))) + withBasicCatalog { catalog => + // Returns false when the function does not exist + assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1"))) - val tempFunc1 = (e: Seq[Expression]) => e.head - val info1 = new ExpressionInfo("tempFunc1", "temp1") - sessionCatalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) + val tempFunc1 = (e: Seq[Expression]) => e.head + val info1 = new ExpressionInfo("tempFunc1", "temp1") + catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) - // Returns true when the function is temporary - assert(sessionCatalog.isTemporaryFunction(FunctionIdentifier("temp1"))) + // Returns true when the function is temporary + assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1"))) - // Returns false when the function is permanent - assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1", Some("db2")))) - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("db2.func1"))) - sessionCatalog.setCurrentDatabase("db2") - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("func1"))) + // Returns false when the function is permanent + assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + assert(!catalog.isTemporaryFunction(FunctionIdentifier("func1", Some("db2")))) + assert(!catalog.isTemporaryFunction(FunctionIdentifier("db2.func1"))) + catalog.setCurrentDatabase("db2") + assert(!catalog.isTemporaryFunction(FunctionIdentifier("func1"))) - // Returns false when the function is built-in or hive - assert(FunctionRegistry.builtin.functionExists("sum")) - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("sum"))) - assert(!sessionCatalog.isTemporaryFunction(FunctionIdentifier("histogram_numeric"))) + // Returns false when the function is built-in or hive + assert(FunctionRegistry.builtin.functionExists("sum")) + assert(!catalog.isTemporaryFunction(FunctionIdentifier("sum"))) + assert(!catalog.isTemporaryFunction(FunctionIdentifier("histogram_numeric"))) + } } test("drop function") { - val externalCatalog = newBasicCatalog() - val sessionCatalog = new SessionCatalog(externalCatalog) - assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) - sessionCatalog.dropFunction( - FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false) - assert(externalCatalog.listFunctions("db2", "*").isEmpty) - // Drop function without explicitly specifying database - sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) - assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) - sessionCatalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false) - assert(externalCatalog.listFunctions("db2", "*").isEmpty) + withBasicCatalog { catalog => + assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + catalog.dropFunction( + FunctionIdentifier("func1", Some("db2")), ignoreIfNotExists = false) + assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) + // Drop function without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) + assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) + catalog.dropFunction(FunctionIdentifier("func2"), ignoreIfNotExists = false) + assert(catalog.externalCatalog.listFunctions("db2", "*").isEmpty) + } } test("drop function when database/function does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.dropFunction( - FunctionIdentifier("something", Some("unknown_db")), ignoreIfNotExists = false) - } - intercept[NoSuchFunctionException] { - catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.dropFunction( + FunctionIdentifier("something", Some("unknown_db")), ignoreIfNotExists = false) + } + intercept[NoSuchFunctionException] { + catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false) + } + catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true) } - catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true) } test("drop temp function") { - val catalog = new SessionCatalog(newBasicCatalog()) - val info = new ExpressionInfo("tempFunc", "func1") - val tempFunc = (e: Seq[Expression]) => e.head - catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false) - val arguments = Seq(Literal(1), Literal(2), Literal(3)) - assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1)) - catalog.dropTempFunction("func1", ignoreIfNotExists = false) - intercept[NoSuchFunctionException] { - catalog.lookupFunction(FunctionIdentifier("func1"), arguments) - } - intercept[NoSuchTempFunctionException] { + withBasicCatalog { catalog => + val info = new ExpressionInfo("tempFunc", "func1") + val tempFunc = (e: Seq[Expression]) => e.head + catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false) + val arguments = Seq(Literal(1), Literal(2), Literal(3)) + assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1)) catalog.dropTempFunction("func1", ignoreIfNotExists = false) + intercept[NoSuchFunctionException] { + catalog.lookupFunction(FunctionIdentifier("func1"), arguments) + } + intercept[NoSuchTempFunctionException] { + catalog.dropTempFunction("func1", ignoreIfNotExists = false) + } + catalog.dropTempFunction("func1", ignoreIfNotExists = true) } - catalog.dropTempFunction("func1", ignoreIfNotExists = true) } test("get function") { - val catalog = new SessionCatalog(newBasicCatalog()) - val expected = - CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, - Seq.empty[FunctionResource]) - assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected) - // Get function without explicitly specifying database - catalog.setCurrentDatabase("db2") - assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected) + withBasicCatalog { catalog => + val expected = + CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass, + Seq.empty[FunctionResource]) + assert(catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("db2"))) == expected) + // Get function without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.getFunctionMetadata(FunctionIdentifier("func1")) == expected) + } } test("get function when database/function does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("unknown_db"))) - } - intercept[NoSuchFunctionException] { - catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2"))) + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("unknown_db"))) + } + intercept[NoSuchFunctionException] { + catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2"))) + } } } test("lookup temp function") { - val catalog = new SessionCatalog(newBasicCatalog()) - val info1 = new ExpressionInfo("tempFunc1", "func1") - val tempFunc1 = (e: Seq[Expression]) => e.head - catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) - assert(catalog.lookupFunction( - FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1)) - catalog.dropTempFunction("func1", ignoreIfNotExists = false) - intercept[NoSuchFunctionException] { - catalog.lookupFunction(FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) + withBasicCatalog { catalog => + val info1 = new ExpressionInfo("tempFunc1", "func1") + val tempFunc1 = (e: Seq[Expression]) => e.head + catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) + assert(catalog.lookupFunction( + FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1)) + catalog.dropTempFunction("func1", ignoreIfNotExists = false) + intercept[NoSuchFunctionException] { + catalog.lookupFunction(FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) + } } } test("list functions") { - val catalog = new SessionCatalog(newBasicCatalog()) - val info1 = new ExpressionInfo("tempFunc1", "func1") - val info2 = new ExpressionInfo("tempFunc2", "yes_me") - val tempFunc1 = (e: Seq[Expression]) => e.head - val tempFunc2 = (e: Seq[Expression]) => e.last - catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) - catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) - catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) - catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false) - assert(catalog.listFunctions("db1", "*").map(_._1).toSet == - Set(FunctionIdentifier("func1"), - FunctionIdentifier("yes_me"))) - assert(catalog.listFunctions("db2", "*").map(_._1).toSet == - Set(FunctionIdentifier("func1"), - FunctionIdentifier("yes_me"), - FunctionIdentifier("func1", Some("db2")), - FunctionIdentifier("func2", Some("db2")), - FunctionIdentifier("not_me", Some("db2")))) - assert(catalog.listFunctions("db2", "func*").map(_._1).toSet == - Set(FunctionIdentifier("func1"), - FunctionIdentifier("func1", Some("db2")), - FunctionIdentifier("func2", Some("db2")))) + withBasicCatalog { catalog => + val info1 = new ExpressionInfo("tempFunc1", "func1") + val info2 = new ExpressionInfo("tempFunc2", "yes_me") + val tempFunc1 = (e: Seq[Expression]) => e.head + val tempFunc2 = (e: Seq[Expression]) => e.last + catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) + catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) + catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) + catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false) + assert(catalog.listFunctions("db1", "*").map(_._1).toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("yes_me"))) + assert(catalog.listFunctions("db2", "*").map(_._1).toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("yes_me"), + FunctionIdentifier("func1", Some("db2")), + FunctionIdentifier("func2", Some("db2")), + FunctionIdentifier("not_me", Some("db2")))) + assert(catalog.listFunctions("db2", "func*").map(_._1).toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("func1", Some("db2")), + FunctionIdentifier("func2", Some("db2")))) + } } test("list functions when database does not exist") { - val catalog = new SessionCatalog(newBasicCatalog()) - intercept[NoSuchDatabaseException] { - catalog.listFunctions("unknown_db", "func*") + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.listFunctions("unknown_db", "func*") + } } } test("clone SessionCatalog - temp views") { - val externalCatalog = newEmptyCatalog() - val original = new SessionCatalog(externalCatalog) - val tempTable1 = Range(1, 10, 1, 10) - original.createTempView("copytest1", tempTable1, overrideIfExists = false) + withEmptyCatalog { original => + val tempTable1 = Range(1, 10, 1, 10) + original.createTempView("copytest1", tempTable1, overrideIfExists = false) - // check if tables copied over - val clone = original.newSessionCatalogWith( - SimpleCatalystConf(caseSensitiveAnalysis = true), - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - assert(original ne clone) - assert(clone.getTempView("copytest1") == Some(tempTable1)) + // check if tables copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getTempView("copytest1") == Some(tempTable1)) - // check if clone and original independent - clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) - assert(original.getTempView("copytest1") == Some(tempTable1)) + // check if clone and original independent + clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = false, purge = false) + assert(original.getTempView("copytest1") == Some(tempTable1)) - val tempTable2 = Range(1, 20, 2, 10) - original.createTempView("copytest2", tempTable2, overrideIfExists = false) - assert(clone.getTempView("copytest2").isEmpty) + val tempTable2 = Range(1, 20, 2, 10) + original.createTempView("copytest2", tempTable2, overrideIfExists = false) + assert(clone.getTempView("copytest2").isEmpty) + } } test("clone SessionCatalog - current db") { - val externalCatalog = newEmptyCatalog() - val db1 = "db1" - val db2 = "db2" - val db3 = "db3" - - externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true) - externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true) - externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true) - - val original = new SessionCatalog(externalCatalog) - original.setCurrentDatabase(db1) - - // check if current db copied over - val clone = original.newSessionCatalogWith( - SimpleCatalystConf(caseSensitiveAnalysis = true), - new Configuration(), - new SimpleFunctionRegistry, - CatalystSqlParser) - assert(original ne clone) - assert(clone.getCurrentDatabase == db1) - - // check if clone and original independent - clone.setCurrentDatabase(db2) - assert(original.getCurrentDatabase == db1) - original.setCurrentDatabase(db3) - assert(clone.getCurrentDatabase == db2) + withEmptyCatalog { original => + val db1 = "db1" + val db2 = "db2" + val db3 = "db3" + + original.externalCatalog.createDatabase(newDb(db1), ignoreIfExists = true) + original.externalCatalog.createDatabase(newDb(db2), ignoreIfExists = true) + original.externalCatalog.createDatabase(newDb(db3), ignoreIfExists = true) + + original.setCurrentDatabase(db1) + + // check if current db copied over + val clone = original.newSessionCatalogWith( + SimpleCatalystConf(caseSensitiveAnalysis = true), + new Configuration(), + new SimpleFunctionRegistry, + CatalystSqlParser) + assert(original ne clone) + assert(clone.getCurrentDatabase == db1) + + // check if clone and original independent + clone.setCurrentDatabase(db2) + assert(original.getCurrentDatabase == db1) + original.setCurrentDatabase(db3) + assert(clone.getCurrentDatabase == db2) + } } test("SPARK-19737: detect undefined functions without triggering relation resolution") { @@ -1258,18 +1363,22 @@ class SessionCatalogSuite extends PlanTest { Seq(true, false) foreach { caseSensitive => val conf = SimpleCatalystConf(caseSensitive) val catalog = new SessionCatalog(newBasicCatalog(), new SimpleFunctionRegistry, conf) - val analyzer = new Analyzer(catalog, conf) - - // The analyzer should report the undefined function rather than the undefined table first. - val cause = intercept[AnalysisException] { - analyzer.execute( - UnresolvedRelation(TableIdentifier("undefined_table")).select( - UnresolvedFunction("undefined_fn", Nil, isDistinct = false) + try { + val analyzer = new Analyzer(catalog, conf) + + // The analyzer should report the undefined function rather than the undefined table first. + val cause = intercept[AnalysisException] { + analyzer.execute( + UnresolvedRelation(TableIdentifier("undefined_table")).select( + UnresolvedFunction("undefined_fn", Nil, isDistinct = false) + ) ) - ) - } + } - assert(cause.getMessage.contains("Undefined function: 'undefined_fn'")) + assert(cause.getMessage.contains("Undefined function: 'undefined_fn'")) + } finally { + catalog.reset() + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalSessionCatalogSuite.scala new file mode 100644 index 0000000000..285f35b0b0 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalSessionCatalogSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.catalog.{CatalogTestUtils, ExternalCatalog, SessionCatalogSuite} +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveExternalSessionCatalogSuite extends SessionCatalogSuite with TestHiveSingleton { + + protected override val isHiveExternalCatalog = true + + private val externalCatalog = { + val catalog = spark.sharedState.externalCatalog + catalog.asInstanceOf[HiveExternalCatalog].client.reset() + catalog + } + + protected val utils = new CatalogTestUtils { + override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" + override val tableOutputFormat: String = + "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat" + override val defaultProvider: String = "hive" + override def newEmptyCatalog(): ExternalCatalog = externalCatalog + } +} -- cgit v1.2.3