aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 12:47:46 -0700
commit7f968867ff61c6b1a007874ee7e3a7421d94d373 (patch)
treec1d3cc77fc215789323ddcac25b6e3c571d3370f /sql/core/src
parent68df47aca55e99406b7b67ef3d4b1008abf1b8b6 (diff)
downloadspark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.gz
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.tar.bz2
spark-7f968867ff61c6b1a007874ee7e3a7421d94d373.zip
[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of moving them to the trash. This is needed in certain configurations where moving the files does not work. For non-Hive tables and partitions, Spark already behaves as if the PURGE option was set, so there's no need to do anything. Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for partitions), so the code reflects that: trying to use the option with older versions of Hive will cause an exception to be thrown. The change is a little noisier than I would like, because of the code to propagate the new flag through all the interfaces and implementations; the main changes are in the parser and in HiveShim, aside from the tests (DDLCommandSuite, VersionsSuite). Tested by running sql and catalyst unit tests, plus VersionsSuite which has been updated to test the version-specific behavior. I also ran an internal test suite that uses PURGE and would not pass previously. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #13831 from vanzin/SPARK-16119.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala63
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala2
8 files changed, 54 insertions, 45 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index f77801fd86..c5f4d58da4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -622,13 +622,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* Create a [[DropTableCommand]] command.
*/
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
- if (ctx.PURGE != null) {
- operationNotAllowed("DROP TABLE ... PURGE", ctx)
- }
DropTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.EXISTS != null,
- ctx.VIEW != null)
+ ctx.VIEW != null,
+ ctx.PURGE != null)
}
/**
@@ -768,13 +766,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (ctx.VIEW != null) {
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
}
- if (ctx.PURGE != null) {
- operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx)
- }
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
- ctx.EXISTS != null)
+ ctx.EXISTS != null,
+ ctx.PURGE != null)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 226f61ef40..a3a057a562 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -179,7 +179,8 @@ case class DescribeDatabaseCommand(
case class DropTableCommand(
tableName: TableIdentifier,
ifExists: Boolean,
- isView: Boolean) extends RunnableCommand {
+ isView: Boolean,
+ purge: Boolean) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
@@ -207,7 +208,7 @@ case class DropTableCommand(
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.refreshTable(tableName)
- catalog.dropTable(tableName, ifExists)
+ catalog.dropTable(tableName, ifExists, purge)
}
Seq.empty[Row]
}
@@ -408,7 +409,8 @@ case class AlterTableRenamePartitionCommand(
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
- ifExists: Boolean)
+ ifExists: Boolean,
+ purge: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -418,7 +420,7 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
- catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
+ catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index a6ae6fe2aa..1ae9b5524c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -297,7 +297,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def dropTempView(viewName: String): Unit = {
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
- sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
+ sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index e57c1716a5..001c1a1d85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
@@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
.collect().toSeq == Row("listtablessuitetable", true) :: Nil)
sqlContext.sessionState.catalog.dropTable(
- TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
+ TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 23c2bef53e..b170a3a77e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest {
val parsed1_table = parser.parsePlan(sql1_table)
val parsed2_table = parser.parsePlan(sql2_table)
- assertUnsupported(sql1_table + " PURGE")
- assertUnsupported(sql2_table + " PURGE")
+ val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
assertUnsupported(sql1_view)
assertUnsupported(sql2_view)
@@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest {
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
- ifExists = true)
+ ifExists = true,
+ purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
+ val expected1_purge = expected1_table.copy(purge = true)
comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
+ comparePlans(parsed1_purge, expected1_purge)
}
test("alter table: archive partition (not supported)") {
@@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest {
val tableName1 = "db.tab"
val tableName2 = "tab"
- val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1")
- val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1")
- val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2")
- val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2")
- assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
-
- val expected1 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
- val expected2 =
- DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
- val expected3 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
- val expected4 =
- DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
-
- comparePlans(parsed1, expected1)
- comparePlans(parsed2, expected2)
- comparePlans(parsed3, expected3)
- comparePlans(parsed4, expected4)
+ val parsed = Seq(
+ s"DROP TABLE $tableName1",
+ s"DROP TABLE IF EXISTS $tableName1",
+ s"DROP TABLE $tableName2",
+ s"DROP TABLE IF EXISTS $tableName2",
+ s"DROP TABLE $tableName2 PURGE",
+ s"DROP TABLE IF EXISTS $tableName2 PURGE"
+ ).map(parser.parsePlan)
+
+ val expected = Seq(
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = false),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
+ purge = true),
+ DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
+ purge = true))
+
+ parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) }
}
test("drop view") {
@@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest {
val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
val expected1 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true,
+ purge = false)
val expected2 =
- DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true,
+ purge = false)
val expected3 =
- DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true,
+ purge = false)
val expected4 =
- DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
+ DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true,
+ purge = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b4294ed7ff..169250d9bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}.getMessage
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
- catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
+ catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
assert(catalog.listDatabases().contains(dbName))
sql(s"DROP DATABASE $dbName RESTRICT")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 83d10010f9..7c394e0b0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("overwriting") {
@@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
checkAnswer(spark.table("t"), data.map(Row.fromTuple))
}
spark.sessionState.catalog.dropTable(
- TableIdentifier("tmp"), ignoreIfNotExists = true)
+ TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
}
test("SPARK-15678: not use cache on overwrite") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index d75df56dd6..07aeaeb695 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -62,7 +62,7 @@ class CatalogSuite
}
private def dropTable(name: String, db: Option[String] = None): Unit = {
- sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
+ sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
}
private def createFunction(name: String, db: Option[String] = None): Unit = {