aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
commit7f968867ff61c6b1a007874ee7e3a7421d94d373 (patch)
treec1d3cc77fc215789323ddcac25b6e3c571d3370f /sql/catalyst/src/main
parent68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (diff)
downloadspark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.gz
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.bz2
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.zip
[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of moving them to the trash. This is needed in certain configurations where moving the files does not work. For non-Hive tables and partitions, Spark already behaves as if the PURGE option was set, so there's no need to do anything. Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for partitions), so the code reflects that: trying to use the option with older versions of Hive will cause an exception to be thrown. The change is a little noisier than I would like, because of the code to propagate the new flag through all the interfaces and implementations; the main changes are in the parser and in HiveShim, aside from the tests (DDLCommandSuite, VersionsSuite). Tested by running sql and catalyst unit tests, plus VersionsSuite which has been updated to test the version-specific behavior. I also ran an internal test suite that uses PURGE and would not pass previously. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #13831 from vanzin/SPARK-16119.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala14
3 files changed, 16 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 6714846e8c..35fc6ddacb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -71,7 +71,7 @@ abstract class ExternalCatalog {
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
- def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
+ def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
def renameTable(db: String, oldName: String, newName: String): Unit
@@ -125,7 +125,8 @@ abstract class ExternalCatalog {
db: String,
table: String,
parts: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index fb3e1b3637..67a90c8895 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -220,7 +220,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
override def dropTable(
db: String,
table: String,
- ignoreIfNotExists: Boolean): Unit = synchronized {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
requireDbExists(db)
if (tableExists(db, table)) {
if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
@@ -358,7 +359,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
db: String,
table: String,
partSpecs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = synchronized {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c0ebb2b1fa..134fc4e698 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -397,7 +397,10 @@ class SessionCatalog(
* If no database is specified, this will first attempt to drop a temporary table with
* the same name, then, if that does not exist, drop the table from the current database.
*/
- def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized {
+ def dropTable(
+ name: TableIdentifier,
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
@@ -405,7 +408,7 @@ class SessionCatalog(
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
if (tableExists(TableIdentifier(table, Option(db)))) {
- externalCatalog.dropTable(db, table, ignoreIfNotExists = true)
+ externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
} else if (!ignoreIfNotExists) {
throw new NoSuchTableException(db = db, table = table)
}
@@ -550,13 +553,14 @@ class SessionCatalog(
def dropPartitions(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ignoreIfNotExists: Boolean): Unit = {
+ ignoreIfNotExists: Boolean,
+ purge: Boolean): Unit = {
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
}
/**
@@ -908,7 +912,7 @@ class SessionCatalog(
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
listTables(DEFAULT_DATABASE).foreach { table =>
- dropTable(table, ignoreIfNotExists = false)
+ dropTable(table, ignoreIfNotExists = false, purge = false)
}
listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
if (func.database.isDefined) {