aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-12-02 21:59:02 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-02 21:59:02 +0800
commit7935c8470c5c162ef7213e394fe8588e5dd42ca2 (patch)
tree0b35862d5113a400a2464c22282f6040b6bcc0d5 /sql/catalyst/src
parent55d528f2ba0ba689dbb881616d9436dc7958e943 (diff)
downloadspark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.tar.gz
spark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.tar.bz2
spark-7935c8470c5c162ef7213e394fe8588e5dd42ca2.zip
[SPARK-18659][SQL] Incorrect behaviors in overwrite table for datasource tables
## What changes were proposed in this pull request? Two bugs are addressed here 1. INSERT OVERWRITE TABLE sometime crashed when catalog partition management was enabled. This was because when dropping partitions after an overwrite operation, the Hive client will attempt to delete the partition files. If the entire partition directory was dropped, this would fail. The PR fixes this by adding a flag to control whether the Hive client should attempt to delete files. 2. The static partition spec for OVERWRITE TABLE was not correctly resolved to the case-sensitive original partition names. This resulted in the entire table being overwritten if you did not correctly capitalize your partition names. cc yhuai cloud-fan ## How was this patch tested? Unit tests. Surprisingly, the existing overwrite table tests did not catch these edge cases. Author: Eric Liang <ekl@databricks.com> Closes #16088 from ericl/spark-18659.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala21
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala27
5 files changed, 44 insertions, 22 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 14dd707fa0..259008f183 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -154,7 +154,8 @@ abstract class ExternalCatalog {
table: String,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit
+ purge: Boolean,
+ retainData: Boolean): Unit
/**
* Override the specs of one or many existing table partitions, assuming they exist.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a3ffeaa63f..880a7a0dc4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -385,7 +385,8 @@ class InMemoryCatalog(
table: String,
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = synchronized {
+ purge: Boolean,
+ retainData: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
@@ -395,7 +396,12 @@ class InMemoryCatalog(
}
}
- val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+ val shouldRemovePartitionLocation = if (retainData) {
+ false
+ } else {
+ getTable(db, table).tableType == CatalogTableType.MANAGED
+ }
+
// TODO: we should follow hive to roll back if one partition path failed to delete, and support
// partial partition spec.
partSpecs.foreach { p =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0b6a91fff7..da3a2079f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,13 +687,14 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
- purge: Boolean): Unit = {
+ purge: Boolean,
+ retainData: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
- externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 303a8662d3..3b39f420af 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
catalog.dropPartitions(
- "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
resetState()
val catalog2 = newBasicCatalog()
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
catalog2.dropPartitions(
- "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false,
+ retainData = false)
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
}
@@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
+ "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
+ "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
}
@@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val catalog = newBasicCatalog()
intercept[AnalysisException] {
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false,
+ retainData = false)
}
catalog.dropPartitions(
- "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
+ "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false)
}
test("get partition") {
@@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
- purge = false)
+ purge = false, retainData = false)
assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
@@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val fs = partPath.getFileSystem(new Configuration)
assert(fs.exists(partPath))
- catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+ catalog.dropPartitions(
+ "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
assert(fs.exists(partPath))
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3f27160d63..f9c4b2687b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
// Drop partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("db2")
@@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2"),
Seq(part2.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
// Drop multiple partitions at once
sessionCatalog.createPartitions(
@@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part1.spec, part2.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
}
@@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl1", Some("unknown_db")),
Seq(),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
intercept[NoSuchTableException] {
catalog.dropPartitions(
TableIdentifier("does_not_exist", Some("db2")),
Seq(),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
}
@@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(part3.spec),
ignoreIfNotExists = true,
- purge = false)
+ purge = false,
+ retainData = false)
}
test("drop partitions with invalid partition spec") {
@@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithMoreColumns.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
@@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")),
Seq(partWithUnknownColumns.spec),
ignoreIfNotExists = false,
- purge = false)
+ purge = false,
+ retainData = false)
}
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +