aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala31
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala57
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala100
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala4
22 files changed, 268 insertions, 109 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 6714846e8c..35fc6ddacb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -71,7 +71,7 @@ abstract class ExternalCatalog {
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
- def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
+ def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
@@ -125,7 +125,8 @@ abstract class ExternalCatalog {
db: String,
table: String,
parts: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index fb3e1b3637..67a90c8895 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -220,7 +220,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
override def dropTable(
db: String,
table: String,
- ignoreIfNotExists: Boolean): Unit = synchronized {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
requireDbExists(db)
if (tableExists(db, table)) {
if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
@@ -358,7 +359,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
db: String,
table: String,
partSpecs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = synchronized {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c0ebb2b1fa..134fc4e698 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -397,7 +397,10 @@ class SessionCatalog(
* If no database is specified, this will first attempt to drop a temporary table with
* the same name, then, if that does not exist, drop the table from the current database.
*/
- def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized {
+ def dropTable(
+ name: TableIdentifier,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
@@ -405,7 +408,7 @@ class SessionCatalog(
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
if (tableExists(TableIdentifier(table, Option(db)))) {
- externalCatalog.dropTable(db, table, ignoreIfNotExists = true)
+ externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
} else if (!ignoreIfNotExists) {
throw new NoSuchTableException(db = db, table = table)
}
@@ -550,13 +553,14 @@ class SessionCatalog(
def dropPartitions(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
}
/**
@@ -908,7 +912,7 @@ class SessionCatalog(
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
listTables(DEFAULT_DATABASE).foreach { table =>
- dropTable(table, ignoreIfNotExists = false)
+ dropTable(table, ignoreIfNotExists = false, purge = false)
}
listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
if (func.database.isDefined) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 0c4d363365..a9268535c4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -99,8 +99,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("drop database when the database is not empty") {
// Throw exception if there are functions left
val catalog1 = newBasicCatalog()
- catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
- catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+ catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
+ catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
intercept[AnalysisException] {
catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
@@ -164,7 +164,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("drop table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
- catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
+ catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
assert(catalog.listTables("db2").toSet == Set("tbl2"))
}
@@ -172,16 +172,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
// Should always throw exception when the database does not exist
intercept[AnalysisException] {
- catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false, purge = false)
}
intercept[AnalysisException] {
- catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
+ catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true, purge = false)
}
// Should throw exception when the table does not exist, if ignoreIfNotExists is false
intercept[AnalysisException] {
- catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false, purge = false)
}
- catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
+ catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false)
}
test("rename table") {
@@ -292,13 +292,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions(
- "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+ "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
resetState()
val catalog2 = newBasicCatalog()
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions(
- "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
@@ -306,11 +306,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+ "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+ "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
}
}
@@ -318,10 +318,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
}
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
}
test("get partition") {
@@ -561,7 +561,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(!exists(db.locationUri, "my_table"))
assert(exists(db.locationUri, "your_table"))
- catalog.dropTable("db1", "your_table", ignoreIfNotExists = false)
+ catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
assert(!exists(db.locationUri, "your_table"))
val externalTable = CatalogTable(
@@ -600,7 +600,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(!exists(databaseDir, "tbl", "a=1", "b=2"))
assert(exists(databaseDir, "tbl", "a=5", "b=6"))
- catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false)
+ catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
+ purge = false)
assert(!exists(databaseDir, "tbl", "a=3", "b=4"))
assert(!exists(databaseDir, "tbl", "a=5", "b=6"))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index adce5df81c..b31b4406ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -98,8 +98,8 @@ class SessionCatalogSuite extends SparkFunSuite {
// Throw exception if there are functions left
val externalCatalog1 = newBasicCatalog()
val sessionCatalog1 = new SessionCatalog(externalCatalog1)
- externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
- externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
+ externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
+ externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
intercept[AnalysisException] {
sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
}
@@ -217,11 +217,12 @@ class SessionCatalogSuite extends SparkFunSuite {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
- sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
+ purge = false)
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
// Drop table without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
- sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false)
assert(externalCatalog.listTables("db2").isEmpty)
}
@@ -229,15 +230,19 @@ class SessionCatalogSuite extends SparkFunSuite {
val catalog = new SessionCatalog(newBasicCatalog())
// Should always throw exception when the database does not exist
intercept[NoSuchDatabaseException] {
- catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false)
+ catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false,
+ purge = false)
}
intercept[NoSuchDatabaseException] {
- catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
+ catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true,
+ purge = false)
}
intercept[NoSuchTableException] {
- catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
+ catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false,
+ purge = false)
}
- catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
+ catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true,
+ purge = false)
}
test("drop temp table") {
@@ -249,16 +254,17 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If database is not specified, temp table should be dropped first
- sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.getTempTable("tbl1") == None)
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
// If temp table does not exist, the table in the current database should be dropped
- sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
// If database is specified, temp tables are never dropped
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
- sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
+ purge = false)
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
}
@@ -394,7 +400,7 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1))
// Then, if that does not exist, look up the relation in the current database
- sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
}
@@ -575,14 +581,16 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
sessionCatalog.dropPartitions(
TableIdentifier("tbl2"),
Seq(part2.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
// Drop multiple partitions at once
sessionCatalog.createPartitions(
@@ -591,7 +599,8 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec, part2.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
}
@@ -601,13 +610,15 @@ class SessionCatalogSuite extends SparkFunSuite {
catalog.dropPartitions(
TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
}
intercept[NoSuchTableException] {
catalog.dropPartitions(
TableIdentifier("does_not_exist", Some("db2")),
Seq(),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
}
}
@@ -617,12 +628,14 @@ class SessionCatalogSuite extends SparkFunSuite {
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
}
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
- ignoreIfNotExists = true)
+ ignoreIfNotExists = true,
+ purge = false)
}
test("drop partitions with invalid partition spec") {
@@ -631,7 +644,8 @@ class SessionCatalogSuite extends SparkFunSuite {
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithMoreColumns.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
@@ -640,7 +654,8 @@ class SessionCatalogSuite extends SparkFunSuite {
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithUnknownColumns.spec),
- ignoreIfNotExists = false)
+ ignoreIfNotExists = false,
+ purge = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index f77801fd86..c5f4d58da4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -622,13 +622,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* Create a [[DropTableCommand]] command.
*/
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.PURGE != null) {
- operationNotAllowed("DROP TABLE ... PURGE", ctx)
- }
DropTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.EXISTS != null,
- ctx.VIEW != null)
+ ctx.VIEW != null,
+ ctx.PURGE != null)
}
/**
@@ -768,13 +766,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.VIEW != null) {
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
}
- if (ctx.PURGE != null) {
- operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx)
- }
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
- ctx.EXISTS != null)
+ ctx.EXISTS != null,
+ ctx.PURGE != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 226f61ef40..a3a057a562 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -179,7 +179,8 @@ case class DescribeDatabaseCommand(
case class DropTableCommand(
tableName: TableIdentifier,
ifExists: Boolean,
- isView: Boolean) extends RunnableCommand {
+ isView: Boolean,
+ purge: Boolean) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
@@ -207,7 +208,7 @@ case class DropTableCommand(
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.refreshTable(tableName)
- catalog.dropTable(tableName, ifExists)
+ catalog.dropTable(tableName, ifExists, purge)
}
Seq.empty[Row]
}
@@ -408,7 +409,8 @@ case class AlterTableRenamePartitionCommand(
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ifExists: Boolean)
+ ifExists: Boolean,
+ purge: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -418,7 +420,7 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
- catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
+ catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index a6ae6fe2aa..1ae9b5524c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -297,7 +297,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def dropTempView(viewName: String): Unit = {
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
- sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
+ sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index e57c1716a5..001c1a1d85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
@@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
.collect().toSeq == Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 23c2bef53e..b170a3a77e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest {
val parsed1_table = parser.parsePlan(sql1_table)
val parsed2_table = parser.parsePlan(sql2_table)
- assertUnsupported(sql1_table + " PURGE")
- assertUnsupported(sql2_table + " PURGE")
+ val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
assertUnsupported(sql1_view)
assertUnsupported(sql2_view)
@@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest {
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
- ifExists = true)
+ ifExists = true,
+ purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
+ val expected1_purge = expected1_table.copy(purge = true)
comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
+ comparePlans(parsed1_purge, expected1_purge)
}
test("alter table: archive partition (not supported)") {
@@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest {
val tableName1 = "db.tab"
val tableName2 = "tab"
- val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1")
- val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1")
- val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2")
- val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2")
- assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
-
- val expected1 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
- val expected2 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
- val expected3 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
- val expected4 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
-
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
- comparePlans(parsed3, expected3)
- comparePlans(parsed4, expected4)
+ val parsed = Seq(
+ s"DROP TABLE $tableName1",
+ s"DROP TABLE IF EXISTS $tableName1",
+ s"DROP TABLE $tableName2",
+ s"DROP TABLE IF EXISTS $tableName2",
+ s"DROP TABLE $tableName2 PURGE",
+ s"DROP TABLE IF EXISTS $tableName2 PURGE"
+ ).map(parser.parsePlan)
+
+ val expected = Seq(
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = true),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = true))
+
+ parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) }
}
test("drop view") {
@@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest {
val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
val expected1 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true,
+ purge = false)
val expected2 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true,
+ purge = false)
val expected3 =
- DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true,
+ purge = false)
val expected4 =
- DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true,
+ purge = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b4294ed7ff..169250d9bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}.getMessage
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
- catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
+ catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
assert(catalog.listDatabases().contains(dbName))
sql(s"DROP DATABASE $dbName RESTRICT")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 83d10010f9..7c394e0b0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("overwriting") {
@@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), data.map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("SPARK-15678: not use cache on overwrite") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index d75df56dd6..07aeaeb695 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -62,7 +62,7 @@ class CatalogSuite
}
private def dropTable(name: String, db: Option[String] = None): Unit = {
- sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
}
private def createFunction(name: String, db: Option[String] = None): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b8bc9ab900..cf2b92fb89 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -192,9 +192,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
override def dropTable(
db: String,
table: String,
- ignoreIfNotExists: Boolean): Unit = withClient {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withClient {
requireDbExists(db)
- client.dropTable(db, table, ignoreIfNotExists)
+ client.dropTable(db, table, ignoreIfNotExists, purge)
}
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
@@ -295,9 +296,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
db: String,
table: String,
parts: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = withClient {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withClient {
requireTableExists(db, table)
- client.dropPartitions(db, table, parts, ignoreIfNotExists)
+ client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
}
override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 5f89696918..6f009d714b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -80,7 +80,7 @@ private[hive] trait HiveClient {
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
/** Drop the specified table. */
- def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
+ def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table)
@@ -121,7 +121,8 @@ private[hive] trait HiveClient {
db: String,
table: String,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
/**
* Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 1c89d8c62a..7e0cef3e35 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -406,8 +406,9 @@ private[hive] class HiveClientImpl(
override def dropTable(
dbName: String,
tableName: String,
- ignoreIfNotExists: Boolean): Unit = withHiveState {
- client.dropTable(dbName, tableName, true, ignoreIfNotExists)
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withHiveState {
+ shim.dropTable(client, dbName, tableName, true, ignoreIfNotExists, purge)
}
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
@@ -429,7 +430,8 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = withHiveState {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
// do the check at first and collect all the matching partitions
@@ -450,7 +452,7 @@ private[hive] class HiveClientImpl(
matchingParts.foreach { partition =>
try {
val deleteData = true
- client.dropPartition(db, table, partition, deleteData)
+ shim.dropPartition(client, db, table, partition, deleteData, purge)
} catch {
case e: Exception =>
val remainingParts = matchingParts.toBuffer -- droppedParts
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9df4a26d55..41527fcd05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
-import java.lang.reflect.{Method, Modifier}
+import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{IntegralType, StringType}
-
+import org.apache.spark.util.Utils
/**
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -129,6 +129,22 @@ private[client] sealed abstract class Shim {
def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
+ def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
+
+ def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit
+
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
require(Modifier.isStatic(method.getModifiers()),
@@ -343,6 +359,32 @@ private[client] class Shim_v0_12 extends Shim with Logging {
dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
}
+ override def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ if (purge) {
+ throw new UnsupportedOperationException("DROP TABLE ... PURGE")
+ }
+ hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
+ }
+
+ override def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit = {
+ if (purge) {
+ throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE")
+ }
+ hive.dropPartition(dbName, tableName, part, deleteData)
+ }
+
override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
"Please use Hive 0.13 or higher.")
@@ -599,6 +641,15 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
+ private lazy val dropTableMethod =
+ findMethod(
+ classOf[Hive],
+ "dropTable",
+ classOf[String],
+ classOf[String],
+ JBoolean.TYPE,
+ JBoolean.TYPE,
+ JBoolean.TYPE)
private lazy val getTimeVarMethod =
findMethod(
classOf[HiveConf],
@@ -643,6 +694,21 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}
+ override def dropTable(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ deleteData: Boolean,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
+ try {
+ dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
+ ignoreIfNotExists: JBoolean, purge: JBoolean)
+ } catch {
+ case e: InvocationTargetException => throw e.getCause()
+ }
+ }
+
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
getTimeVarMethod.invoke(
conf,
@@ -696,6 +762,19 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
JBoolean.TYPE,
JLong.TYPE)
+ private lazy val dropOptionsClass =
+ Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
+ private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData")
+ private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
+ private lazy val dropPartitionMethod =
+ findMethod(
+ classOf[Hive],
+ "dropPartition",
+ classOf[String],
+ classOf[String],
+ classOf[JList[String]],
+ dropOptionsClass)
+
override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
@@ -710,4 +789,21 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
0L: JLong)
}
+ override def dropPartition(
+ hive: Hive,
+ dbName: String,
+ tableName: String,
+ part: JList[String],
+ deleteData: Boolean,
+ purge: Boolean): Unit = {
+ val dropOptions = dropOptionsClass.newInstance().asInstanceOf[Object]
+ dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
+ dropOptionsPurge.setBoolean(dropOptions, purge)
+ try {
+ dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
+ } catch {
+ case e: InvocationTargetException => throw e.getCause()
+ }
+ }
+
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 15a5d79dcb..2762e0cdd5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -95,7 +95,8 @@ case class CreateHiveTableAsSelectCommand(
} catch {
case NonFatal(e) =>
// drop the created table.
- sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true)
+ sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
+ purge = false)
throw e
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 8dc756b938..6eeb67510c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -43,7 +43,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def afterAll(): Unit = {
try {
sessionState.catalog.dropTable(
- TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
+ TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true, purge = false)
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a5975cf483..b275ab17a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -165,7 +165,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tempTable"), ignoreIfNotExists = true)
+ TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5b209acf0f..a972f61e25 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -249,7 +249,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
test(s"$version: dropTable") {
- client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false)
+ val versionsWithoutPurge = versions.takeWhile(_ != "0.14")
+ // First try with the purge option set. This should fail if the version is < 0.14, in which
+ // case we check the version and try without it.
+ try {
+ client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
+ purge = true)
+ assert(!versionsWithoutPurge.contains(version))
+ } catch {
+ case _: UnsupportedOperationException =>
+ assert(versionsWithoutPurge.contains(version))
+ client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
+ purge = false)
+ }
assert(client.listTables("default") === Seq("src"))
}
@@ -366,7 +378,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: dropPartitions") {
val spec = Map("key1" -> "1", "key2" -> "3")
- client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true)
+ val versionsWithoutPurge = versions.takeWhile(_ != "1.2")
+ // Similar to dropTable; try with purge set, and if it fails, make sure we're running
+ // with a version that is older than the minimum (1.2 in this case).
+ try {
+ client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
+ purge = true)
+ assert(!versionsWithoutPurge.contains(version))
+ } catch {
+ case _: UnsupportedOperationException =>
+ assert(versionsWithoutPurge.contains(version))
+ client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
+ purge = false)
+ }
+
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index af8115cf9d..b9e98fc85f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
+ sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("self-join") {