aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
commit7f968867ff61c6b1a007874ee7e3a7421d94d373 (patch)
treec1d3cc77fc215789323ddcac25b6e3c571d3370f /sql/core/src/test
parent68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (diff)
downloadspark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.gz
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.bz2
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.zip
[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of moving them to the trash. This is needed in certain configurations where moving the files does not work. For non-Hive tables and partitions, Spark already behaves as if the PURGE option was set, so there's no need to do anything. Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for partitions), so the code reflects that: trying to use the option with older versions of Hive will cause an exception to be thrown. The change is a little noisier than I would like, because of the code to propagate the new flag through all the interfaces and implementations; the main changes are in the parser and in HiveShim, aside from the tests (DDLCommandSuite, VersionsSuite). Tested by running sql and catalyst unit tests, plus VersionsSuite which has been updated to test the version-specific behavior. I also ran an internal test suite that uses PURGE and would not pass previously. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #13831 from vanzin/SPARK-16119.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala2
5 files changed, 43 insertions, 32 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index e57c1716a5..001c1a1d85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
@@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
.collect().toSeq == Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 23c2bef53e..b170a3a77e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest {
val parsed1_table = parser.parsePlan(sql1_table)
val parsed2_table = parser.parsePlan(sql2_table)
- assertUnsupported(sql1_table + " PURGE")
- assertUnsupported(sql2_table + " PURGE")
+ val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
assertUnsupported(sql1_view)
assertUnsupported(sql2_view)
@@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest {
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
- ifExists = true)
+ ifExists = true,
+ purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
+ val expected1_purge = expected1_table.copy(purge = true)
comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
+ comparePlans(parsed1_purge, expected1_purge)
}
test("alter table: archive partition (not supported)") {
@@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest {
val tableName1 = "db.tab"
val tableName2 = "tab"
- val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1")
- val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1")
- val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2")
- val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2")
- assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
-
- val expected1 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
- val expected2 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
- val expected3 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
- val expected4 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
-
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
- comparePlans(parsed3, expected3)
- comparePlans(parsed4, expected4)
+ val parsed = Seq(
+ s"DROP TABLE $tableName1",
+ s"DROP TABLE IF EXISTS $tableName1",
+ s"DROP TABLE $tableName2",
+ s"DROP TABLE IF EXISTS $tableName2",
+ s"DROP TABLE $tableName2 PURGE",
+ s"DROP TABLE IF EXISTS $tableName2 PURGE"
+ ).map(parser.parsePlan)
+
+ val expected = Seq(
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = true),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = true))
+
+ parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) }
}
test("drop view") {
@@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest {
val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
val expected1 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true,
+ purge = false)
val expected2 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true,
+ purge = false)
val expected3 =
- DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true,
+ purge = false)
val expected4 =
- DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true,
+ purge = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b4294ed7ff..169250d9bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}.getMessage
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
- catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
+ catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
assert(catalog.listDatabases().contains(dbName))
sql(s"DROP DATABASE $dbName RESTRICT")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 83d10010f9..7c394e0b0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("overwriting") {
@@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), data.map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("SPARK-15678: not use cache on overwrite") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index d75df56dd6..07aeaeb695 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -62,7 +62,7 @@ class CatalogSuite
}
private def dropTable(name: String, db: Option[String] = None): Unit = {
- sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
}
private def createFunction(name: String, db: Option[String] = None): Unit = {