diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2016-07-12 12:47:46 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-07-12 12:47:46 -0700 |
commit | 7f968867ff61c6b1a007874ee7e3a7421d94d373 (patch) | |
tree | c1d3cc77fc215789323ddcac25b6e3c571d3370f /sql/core/src/test | |
parent | 68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (diff) | |
download | spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.gz spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.bz2 spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.zip |
[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of
moving them to the trash. This is needed in certain configurations
where moving the files does not work. For non-Hive tables and partitions,
Spark already behaves as if the PURGE option was set, so there's no
need to do anything.
Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for
partitions), so the code reflects that: trying to use the option with
older versions of Hive will cause an exception to be thrown.
The change is a little noisier than I would like, because of the code
to propagate the new flag through all the interfaces and implementations;
the main changes are in the parser and in HiveShim, aside from the tests
(DDLCommandSuite, VersionsSuite).
Tested by running sql and catalyst unit tests, plus VersionsSuite which
has been updated to test the version-specific behavior. I also ran an
internal test suite that uses PURGE and would not pass previously.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #13831 from vanzin/SPARK-16119.
Diffstat (limited to 'sql/core/src/test')
5 files changed, 43 insertions, 32 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index e57c1716a5..001c1a1d85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { Row("listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) } @@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { .collect().toSeq == Row("listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 23c2bef53e..b170a3a77e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest { val parsed1_table = parser.parsePlan(sql1_table) val parsed2_table = parser.parsePlan(sql2_table) - assertUnsupported(sql1_table + " PURGE") - assertUnsupported(sql2_table + " PURGE") + val parsed1_purge = parser.parsePlan(sql1_table + " PURGE") assertUnsupported(sql1_view) assertUnsupported(sql2_view) @@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest { Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), - ifExists = true) + ifExists = true, + purge = false) val expected2_table = expected1_table.copy(ifExists = false) + val expected1_purge = expected1_table.copy(purge = true) comparePlans(parsed1_table, expected1_table) comparePlans(parsed2_table, expected2_table) + comparePlans(parsed1_purge, expected1_purge) } test("alter table: archive partition (not supported)") { @@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest { val tableName1 = "db.tab" val tableName2 = "tab" - val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1") - val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1") - val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2") - val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2") - assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE") - - val expected1 = - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false) - val expected2 = - DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false) - val expected3 = - DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false) - val expected4 = - DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected3) - comparePlans(parsed4, expected4) + val parsed = Seq( + s"DROP TABLE $tableName1", + s"DROP TABLE IF EXISTS $tableName1", + s"DROP TABLE $tableName2", + s"DROP TABLE IF EXISTS $tableName2", + s"DROP TABLE $tableName2 PURGE", + s"DROP TABLE IF EXISTS $tableName2 PURGE" + ).map(parser.parsePlan) + + val expected = Seq( + DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, + purge = false), + DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false, + purge = true), + DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false, + purge = true)) + + parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) } } test("drop view") { @@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest { val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2") val expected1 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true) + DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true, + purge = false) val expected2 = - DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true) + DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true, + purge = false) val expected3 = - DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true) + DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true, + purge = false) val expected4 = - DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true) + DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true, + purge = false) comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b4294ed7ff..169250d9bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { }.getMessage assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) - catalog.dropTable(tableIdent1, ignoreIfNotExists = false) + catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false) assert(catalog.listDatabases().contains(dbName)) sql(s"DROP DATABASE $dbName RESTRICT") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 83d10010f9..7c394e0b0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple)) } spark.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("overwriting") { @@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(spark.table("t"), data.map(Row.fromTuple)) } spark.sessionState.catalog.dropTable( - TableIdentifier("tmp"), ignoreIfNotExists = true) + TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) } test("SPARK-15678: not use cache on overwrite") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index d75df56dd6..07aeaeb695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -62,7 +62,7 @@ class CatalogSuite } private def dropTable(name: String, db: Option[String] = None): Unit = { - sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false) + sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false) } private def createFunction(name: String, db: Option[String] = None): Unit = { |