diff options
22 files changed, 268 insertions, 109 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 6714846e8c..35fc6ddacb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -71,7 +71,7 @@ abstract class ExternalCatalog { def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit - def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit + def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit def renameTable(db: String, oldName: String, newName: String): Unit @@ -125,7 +125,8 @@ abstract class ExternalCatalog { db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit + ignoreIfNotExists: Boolean, + purge: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index fb3e1b3637..67a90c8895 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -220,7 +220,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E override def dropTable( db: String, table: String, - ignoreIfNotExists: Boolean): Unit = synchronized { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = synchronized { requireDbExists(db) if (tableExists(db, table)) { if (getTable(db, table).tableType == CatalogTableType.MANAGED) { @@ -358,7 +359,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E db: String, table: String, partSpecs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = synchronized { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index c0ebb2b1fa..134fc4e698 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -397,7 +397,10 @@ class SessionCatalog( * If no database is specified, this will first attempt to drop a temporary table with * the same name, then, if that does not exist, drop the table from the current database. */ - def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized { + def dropTable( + name: TableIdentifier, + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { @@ -405,7 +408,7 @@ class SessionCatalog( // When ignoreIfNotExists is false, no exception is issued when the table does not exist. // Instead, log it as an error message. if (tableExists(TableIdentifier(table, Option(db)))) { - externalCatalog.dropTable(db, table, ignoreIfNotExists = true) + externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) } else if (!ignoreIfNotExists) { throw new NoSuchTableException(db = db, table = table) } @@ -550,13 +553,14 @@ class SessionCatalog( def dropPartitions( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = { requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) - externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists) + externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge) } /** @@ -908,7 +912,7 @@ class SessionCatalog( dropDatabase(db, ignoreIfNotExists = false, cascade = true) } listTables(DEFAULT_DATABASE).foreach { table => - dropTable(table, ignoreIfNotExists = false) + dropTable(table, ignoreIfNotExists = false, purge = false) } listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => if (func.database.isDefined) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 0c4d363365..a9268535c4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -99,8 +99,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("drop database when the database is not empty") { // Throw exception if there are functions left val catalog1 = newBasicCatalog() - catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) - catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) + catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false) + catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false) intercept[AnalysisException] { catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) } @@ -164,7 +164,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("drop table") { val catalog = newBasicCatalog() assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false) + catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false) assert(catalog.listTables("db2").toSet == Set("tbl2")) } @@ -172,16 +172,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() // Should always throw exception when the database does not exist intercept[AnalysisException] { - catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false) + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false, purge = false) } intercept[AnalysisException] { - catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true) + catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true, purge = false) } // Should throw exception when the table does not exist, if ignoreIfNotExists is false intercept[AnalysisException] { - catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false) + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false, purge = false) } - catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true) + catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false) } test("rename table") { @@ -292,13 +292,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false) + "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false) assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) resetState() val catalog2 = newBasicCatalog() assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false) assert(catalog2.listPartitions("db2", "tbl2").isEmpty) } @@ -306,11 +306,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false) + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false) } intercept[AnalysisException] { catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false) + "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false) } } @@ -318,10 +318,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val catalog = newBasicCatalog() intercept[AnalysisException] { catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false) } catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true) + "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false) } test("get partition") { @@ -561,7 +561,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(!exists(db.locationUri, "my_table")) assert(exists(db.locationUri, "your_table")) - catalog.dropTable("db1", "your_table", ignoreIfNotExists = false) + catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false) assert(!exists(db.locationUri, "your_table")) val externalTable = CatalogTable( @@ -600,7 +600,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(!exists(databaseDir, "tbl", "a=1", "b=2")) assert(exists(databaseDir, "tbl", "a=5", "b=6")) - catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false) + catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, + purge = false) assert(!exists(databaseDir, "tbl", "a=3", "b=4")) assert(!exists(databaseDir, "tbl", "a=5", "b=6")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index adce5df81c..b31b4406ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -98,8 +98,8 @@ class SessionCatalogSuite extends SparkFunSuite { // Throw exception if there are functions left val externalCatalog1 = newBasicCatalog() val sessionCatalog1 = new SessionCatalog(externalCatalog1) - externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) - externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) + externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false) + externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false) intercept[AnalysisException] { sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) } @@ -217,11 +217,12 @@ class SessionCatalogSuite extends SparkFunSuite { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, + purge = false) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) // Drop table without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") - sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false) assert(externalCatalog.listTables("db2").isEmpty) } @@ -229,15 +230,19 @@ class SessionCatalogSuite extends SparkFunSuite { val catalog = new SessionCatalog(newBasicCatalog()) // Should always throw exception when the database does not exist intercept[NoSuchDatabaseException] { - catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false) + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false, + purge = false) } intercept[NoSuchDatabaseException] { - catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true) + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true, + purge = false) } intercept[NoSuchTableException] { - catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false) + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false, + purge = false) } - catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true) + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true, + purge = false) } test("drop temp table") { @@ -249,16 +254,17 @@ class SessionCatalogSuite extends SparkFunSuite { assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be dropped first - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.getTempTable("tbl1") == None) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If temp table does not exist, the table in the current database should be dropped - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) // If database is specified, temp tables are never dropped sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) - sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false, + purge = false) assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) } @@ -394,7 +400,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database - sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1))) } @@ -575,14 +581,16 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) // Drop partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("db2") sessionCatalog.dropPartitions( TableIdentifier("tbl2"), Seq(part2.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) // Drop multiple partitions at once sessionCatalog.createPartitions( @@ -591,7 +599,8 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) } @@ -601,13 +610,15 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl1", Some("unknown_db")), Seq(), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) } intercept[NoSuchTableException] { catalog.dropPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) } } @@ -617,12 +628,14 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), - ignoreIfNotExists = true) + ignoreIfNotExists = true, + purge = false) } test("drop partitions with invalid partition spec") { @@ -631,7 +644,8 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(partWithMoreColumns.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, b, c) must be contained within " + @@ -640,7 +654,8 @@ class SessionCatalogSuite extends SparkFunSuite { catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(partWithUnknownColumns.spec), - ignoreIfNotExists = false) + ignoreIfNotExists = false, + purge = false) } assert(e.getMessage.contains( "Partition spec is invalid. The spec (a, unknown) must be contained within " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f77801fd86..c5f4d58da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -622,13 +622,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * Create a [[DropTableCommand]] command. */ override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { - if (ctx.PURGE != null) { - operationNotAllowed("DROP TABLE ... PURGE", ctx) - } DropTableCommand( visitTableIdentifier(ctx.tableIdentifier), ctx.EXISTS != null, - ctx.VIEW != null) + ctx.VIEW != null, + ctx.PURGE != null) } /** @@ -768,13 +766,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.VIEW != null) { operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx) } - if (ctx.PURGE != null) { - operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx) - } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), - ctx.EXISTS != null) + ctx.EXISTS != null, + ctx.PURGE != null) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 226f61ef40..a3a057a562 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -179,7 +179,8 @@ case class DescribeDatabaseCommand( case class DropTableCommand( tableName: TableIdentifier, ifExists: Boolean, - isView: Boolean) extends RunnableCommand { + isView: Boolean, + purge: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog @@ -207,7 +208,7 @@ case class DropTableCommand( case NonFatal(e) => log.warn(e.toString, e) } catalog.refreshTable(tableName) - catalog.dropTable(tableName, ifExists) + catalog.dropTable(tableName, ifExists, purge) } Seq.empty[Row] } @@ -408,7 +409,8 @@ case class AlterTableRenamePartitionCommand( case class AlterTableDropPartitionCommand( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - ifExists: Boolean) + ifExists: Boolean, + purge: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -418,7 +420,7 @@ case class AlterTableDropPartitionCommand( throw new AnalysisException( "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API") } - catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists) + catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index a6ae6fe2aa..1ae9b5524c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -297,7 +297,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def dropTempView(viewName: String): Unit = { sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) - sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true) + sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index e57c1716a5..001c1a1d85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { Row("listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) } @@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { .collect().toSeq == Row("listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 23c2bef53e..b170a3a77e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest { val parsed1_table = parser.parsePlan(sql1_table) val parsed2_table = parser.parsePlan(sql2_table) - assertUnsupported(sql1_table + " PURGE") - assertUnsupported(sql2_table + " PURGE") + val parsed1_purge = parser.parsePlan(sql1_table + " PURGE") assertUnsupported(sql1_view) assertUnsupported(sql2_view) @@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest { Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = true) + ifExists = true, + purge = false) val expected2_table = expected1_table.copy(ifExists = false) + val expected1_purge = expected1_table.copy(purge = true) comparePlans(parsed1_table, expected1_table) comparePlans(parsed2_table, expected2_table) + comparePlans(parsed1_purge, expected1_purge) } test("alter table: archive partition (not supported)") { @@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest { val tableName1 = "db.tab" val tableName2 = "tab" - val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1") - val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1") - val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2") - val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2") - assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE") - - val expected1 = - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false) - val expected2 = - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false) - val expected3 = - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false) - val expected4 = - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) + val parsed = Seq( + s"DROP TABLE $tableName1", + s"DROP TABLE IF EXISTS $tableName1", + s"DROP TABLE $tableName2", + s"DROP TABLE IF EXISTS $tableName2", + s"DROP TABLE $tableName2 PURGE", + s"DROP TABLE IF EXISTS $tableName2 PURGE" + ).map(parser.parsePlan) + + val expected = Seq( + DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, + purge = true), + DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, + purge = true)) + + parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) } } test("drop view") { @@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest { val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") val expected1 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true) + DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true, + purge = false) val expected2 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true) + DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true, + purge = false) val expected3 = - DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true) + DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true, + purge = false) val expected4 = - DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true) + DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true, + purge = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b4294ed7ff..169250d9bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { }.getMessage assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) - catalog.dropTable(tableIdent1, ignoreIfNotExists = false) + catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false) assert(catalog.listDatabases().contains(dbName)) sql(s"DROP DATABASE $dbName RESTRICT") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 83d10010f9..7c394e0b0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple)) } spark.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("overwriting") { @@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(spark.table("t"), data.map(Row.fromTuple)) } spark.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("SPARK-15678: not use cache on overwrite") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index d75df56dd6..07aeaeb695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -62,7 +62,7 @@ class CatalogSuite } private def dropTable(name: String, db: Option[String] = None): Unit = { - sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false) } private def createFunction(name: String, db: Option[String] = None): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b8bc9ab900..cf2b92fb89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -192,9 +192,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu override def dropTable( db: String, table: String, - ignoreIfNotExists: Boolean): Unit = withClient { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = withClient { requireDbExists(db) - client.dropTable(db, table, ignoreIfNotExists) + client.dropTable(db, table, ignoreIfNotExists, purge) } override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { @@ -295,9 +296,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = withClient { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = withClient { requireTableExists(db, table) - client.dropPartitions(db, table, parts, ignoreIfNotExists) + client.dropPartitions(db, table, parts, ignoreIfNotExists, purge) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 5f89696918..6f009d714b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -80,7 +80,7 @@ private[hive] trait HiveClient { def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit /** Drop the specified table. */ - def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit + def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit /** Alter a table whose name matches the one specified in `table`, assuming it exists. */ final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table) @@ -121,7 +121,8 @@ private[hive] trait HiveClient { db: String, table: String, specs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit + ignoreIfNotExists: Boolean, + purge: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1c89d8c62a..7e0cef3e35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -406,8 +406,9 @@ private[hive] class HiveClientImpl( override def dropTable( dbName: String, tableName: String, - ignoreIfNotExists: Boolean): Unit = withHiveState { - client.dropTable(dbName, tableName, true, ignoreIfNotExists) + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = withHiveState { + shim.dropTable(client, dbName, tableName, true, ignoreIfNotExists, purge) } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { @@ -429,7 +430,8 @@ private[hive] class HiveClientImpl( db: String, table: String, specs: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit = withHiveState { + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) // do the check at first and collect all the matching partitions @@ -450,7 +452,7 @@ private[hive] class HiveClientImpl( matchingParts.foreach { partition => try { val deleteData = true - client.dropPartition(db, table, partition, deleteData) + shim.dropPartition(client, db, table, partition, deleteData, purge) } catch { case e: Exception => val remainingParts = matchingParts.toBuffer -- droppedParts diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 9df4a26d55..41527fcd05 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.client import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong} -import java.lang.reflect.{Method, Modifier} +import java.lang.reflect.{InvocationTargetException, Method, Modifier} import java.net.URI import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet} import java.util.concurrent.TimeUnit @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{IntegralType, StringType} - +import org.apache.spark.util.Utils /** * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used @@ -129,6 +129,22 @@ private[client] sealed abstract class Shim { def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit + def dropTable( + hive: Hive, + dbName: String, + tableName: String, + deleteData: Boolean, + ignoreIfNotExists: Boolean, + purge: Boolean): Unit + + def dropPartition( + hive: Hive, + dbName: String, + tableName: String, + part: JList[String], + deleteData: Boolean, + purge: Boolean): Unit + protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { val method = findMethod(klass, name, args: _*) require(Modifier.isStatic(method.getModifiers()), @@ -343,6 +359,32 @@ private[client] class Shim_v0_12 extends Shim with Logging { dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean) } + override def dropTable( + hive: Hive, + dbName: String, + tableName: String, + deleteData: Boolean, + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = { + if (purge) { + throw new UnsupportedOperationException("DROP TABLE ... PURGE") + } + hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists) + } + + override def dropPartition( + hive: Hive, + dbName: String, + tableName: String, + part: JList[String], + deleteData: Boolean, + purge: Boolean): Unit = { + if (purge) { + throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE") + } + hive.dropPartition(dbName, tableName, part, deleteData) + } + override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " + "Please use Hive 0.13 or higher.") @@ -599,6 +641,15 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE) + private lazy val dropTableMethod = + findMethod( + classOf[Hive], + "dropTable", + classOf[String], + classOf[String], + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE) private lazy val getTimeVarMethod = findMethod( classOf[HiveConf], @@ -643,6 +694,21 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE) } + override def dropTable( + hive: Hive, + dbName: String, + tableName: String, + deleteData: Boolean, + ignoreIfNotExists: Boolean, + purge: Boolean): Unit = { + try { + dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean, + ignoreIfNotExists: JBoolean, purge: JBoolean) + } catch { + case e: InvocationTargetException => throw e.getCause() + } + } + override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = { getTimeVarMethod.invoke( conf, @@ -696,6 +762,19 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { JBoolean.TYPE, JLong.TYPE) + private lazy val dropOptionsClass = + Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions") + private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData") + private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData") + private lazy val dropPartitionMethod = + findMethod( + classOf[Hive], + "dropPartition", + classOf[String], + classOf[String], + classOf[JList[String]], + dropOptionsClass) + override def loadDynamicPartitions( hive: Hive, loadPath: Path, @@ -710,4 +789,21 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { 0L: JLong) } + override def dropPartition( + hive: Hive, + dbName: String, + tableName: String, + part: JList[String], + deleteData: Boolean, + purge: Boolean): Unit = { + val dropOptions = dropOptionsClass.newInstance().asInstanceOf[Object] + dropOptionsDeleteData.setBoolean(dropOptions, deleteData) + dropOptionsPurge.setBoolean(dropOptions, purge) + try { + dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions) + } catch { + case e: InvocationTargetException => throw e.getCause() + } + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 15a5d79dcb..2762e0cdd5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -95,7 +95,8 @@ case class CreateHiveTableAsSelectCommand( } catch { case NonFatal(e) => // drop the created table. - sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true) + sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true, + purge = false) throw e } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 8dc756b938..6eeb67510c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -43,7 +43,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft override def afterAll(): Unit = { try { sessionState.catalog.dropTable( - TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true, purge = false) sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a5975cf483..b275ab17a9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -165,7 +165,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") } spark.sessionState.catalog.dropTable( - TableIdentifier("tempTable"), ignoreIfNotExists = true) + TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 5b209acf0f..a972f61e25 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -249,7 +249,19 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: dropTable") { - client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false) + val versionsWithoutPurge = versions.takeWhile(_ != "0.14") + // First try with the purge option set. This should fail if the version is < 0.14, in which + // case we check the version and try without it. + try { + client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, + purge = true) + assert(!versionsWithoutPurge.contains(version)) + } catch { + case _: UnsupportedOperationException => + assert(versionsWithoutPurge.contains(version)) + client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, + purge = false) + } assert(client.listTables("default") === Seq("src")) } @@ -366,7 +378,20 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: dropPartitions") { val spec = Map("key1" -> "1", "key2" -> "3") - client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true) + val versionsWithoutPurge = versions.takeWhile(_ != "1.2") + // Similar to dropTable; try with purge set, and if it fails, make sure we're running + // with a version that is older than the minimum (1.2 in this case). + try { + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, + purge = true) + assert(!versionsWithoutPurge.contains(version)) + } catch { + case _: UnsupportedOperationException => + assert(versionsWithoutPurge.contains(version)) + client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, + purge = false) + } + assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index af8115cf9d..b9e98fc85f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) + sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("overwriting") { @@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) + sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("self-join") { |