aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-01-03 11:43:47 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-03 11:43:47 -0800
commitb67b35f76b684c5176dc683e7491fd01b43f4467 (patch)
tree22c57087665b0e490c692ebe3f87fa6bd189b4d3
parent89bf370e4f53c02b018b23adc653cd718869489e (diff)
downloadspark-b67b35f76b684c5176dc683e7491fd01b43f4467.tar.gz
spark-b67b35f76b684c5176dc683e7491fd01b43f4467.tar.bz2
spark-b67b35f76b684c5176dc683e7491fd01b43f4467.zip
[SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned Tables in InMemoryCatalog
### What changes were proposed in this pull request? The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition. This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`. ### How was this patch tested? Added test cases for both HiveExternalCatalog and InMemoryCatalog Author: gatorsmile <gatorsmile@gmail.com> Closes #16448 from gatorsmile/unsetSerdeProp.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala13
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala56
4 files changed, 113 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 816e4af2df..15aed5f9b1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -229,9 +229,22 @@ class InMemoryCatalog(
if (tableExists(db, table)) {
val tableMeta = getTable(db, table)
if (tableMeta.tableType == CatalogTableType.MANAGED) {
+ // Delete the data/directory for each partition
+ val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location)
+ locationAllParts.foreach { loc =>
+ val partitionPath = new Path(loc)
+ try {
+ val fs = partitionPath.getFileSystem(hadoopConfig)
+ fs.delete(partitionPath, true)
+ } catch {
+ case e: IOException =>
+ throw new SparkException(s"Unable to delete partition path $partitionPath", e)
+ }
+ }
assert(tableMeta.storage.locationUri.isDefined,
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
+ // Delete the data/directory of the table
val dir = new Path(tableMeta.location)
try {
val fs = dir.getFileSystem(hadoopConfig)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 742f900840..176cccce65 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
@@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(new Path(partitionLocation) == defaultPartitionLocation)
}
+ test("create/drop partitions in managed tables with location") {
+ val catalog = newBasicCatalog()
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("db1")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("partCol1", "int")
+ .add("partCol2", "string"),
+ provider = Some("hive"),
+ partitionColumnNames = Seq("partCol1", "partCol2"))
+ catalog.createTable(table, ignoreIfExists = false)
+
+ val newLocationPart1 = newUriForDatabase()
+ val newLocationPart2 = newUriForDatabase()
+
+ val partition1 =
+ CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
+ storageFormat.copy(locationUri = Some(newLocationPart1)))
+ val partition2 =
+ CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
+ storageFormat.copy(locationUri = Some(newLocationPart2)))
+ catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
+ catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)
+
+ assert(exists(newLocationPart1))
+ assert(exists(newLocationPart2))
+
+ // the corresponding directory is dropped.
+ catalog.dropPartitions("db1", "tbl", Seq(partition1.spec),
+ ignoreIfNotExists = false, purge = false, retainData = false)
+ assert(!exists(newLocationPart1))
+
+ // all the remaining directories are dropped.
+ catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false)
+ assert(!exists(newLocationPart2))
+ }
+
test("list partition names") {
val catalog = newBasicCatalog()
val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat)
@@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
@@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("my_table", Some("db1")),
tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
)
@@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
- storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+ storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 522158b641..59a29e8847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -400,13 +400,12 @@ case class AlterTableSerDePropertiesCommand(
/**
* Add Partition in ALTER TABLE: add the table partitions.
*
- * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
- * EXCEPT that it is ILLEGAL to specify a LOCATION clause.
* An error message will be issued if the partition exists, unless 'ifNotExists' is true.
*
* The syntax of this command is:
* {{{
- * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
+ * PARTITION spec2 [LOCATION 'loc2']
* }}}
*/
case class AlterTableAddPartitionCommand(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f313db641b..8b34219530 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -199,6 +199,52 @@ class HiveDDLSuite
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
}
+ test("add/drop partition with location - managed table") {
+ val tab = "tab_with_partitions"
+ withTempDir { tmpDir =>
+ val basePath = new File(tmpDir.getCanonicalPath)
+ val part1Path = new File(basePath + "/part1")
+ val part2Path = new File(basePath + "/part2")
+ val dirSet = part1Path :: part2Path :: Nil
+
+ // Before data insertion, all the directory are empty
+ assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
+
+ withTable(tab) {
+ sql(
+ s"""
+ |CREATE TABLE $tab (key INT, value STRING)
+ |PARTITIONED BY (ds STRING, hr STRING)
+ """.stripMargin)
+ sql(
+ s"""
+ |ALTER TABLE $tab ADD
+ |PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path'
+ |PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path'
+ """.stripMargin)
+ assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
+
+ sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'")
+ sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'")
+ // add partition will not delete the data
+ assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
+ checkAnswer(
+ spark.table(tab),
+ Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil
+ )
+
+ sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)")
+ // drop partition will delete the data
+ assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty)
+ assert(part2Path.listFiles.nonEmpty)
+
+ sql(s"DROP TABLE $tab")
+ // drop table will delete the data of the managed table
+ assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty))
+ }
+ }
+ }
+
test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
@@ -257,9 +303,15 @@ class HiveDDLSuite
// drop partition will not delete the data of external table
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
- sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')")
+ sql(
+ s"""
+ |ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')
+ |PARTITION (ds='2008-04-08', hr=11)
+ """.stripMargin)
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
- Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11")))
+ Set(Map("ds" -> "2008-04-08", "hr" -> "11"),
+ Map("ds" -> "2008-04-08", "hr" -> "12"),
+ Map("ds" -> "2008-04-09", "hr" -> "11")))
// add partition will not delete the data
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))