aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-12 11:14:40 -0700
committerAndrew Or <andrew@databricks.com>2016-05-12 11:14:40 -0700
commitbe617f3d0695982f982006fdd79afe3e3730b4c4 (patch)
treed15e5bc1f502c4cf61b866ccc12ec45f7de64967 /sql
parent470de743ecf3617babd86f50ab203e85aa975d69 (diff)
downloadspark-be617f3d0695982f982006fdd79afe3e3730b4c4.tar.gz
spark-be617f3d0695982f982006fdd79afe3e3730b4c4.tar.bz2
spark-be617f3d0695982f982006fdd79afe3e3730b4c4.zip
[SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping
#### What changes were proposed in this pull request? ~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter Table Drop Partition. However, the internal implementation could break atomicity. That means, we could just drop a subset of qualified partitions, if hitting an exception when dropping one of qualified partitions~~ ~~This PR contains the following behavior changes:~~ ~~- disallow dropping multiple partitions by a single command ~~ ~~- allow users to input predicates in partition specification and issue a nicer error message if the predicate's comparison operator is not `=`.~~ ~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop Partition` does not correspond to multiple partitions.~~ This PR has two major parts: - Verify the partition spec in SessionCatalog for fixing the following issue: ```scala sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')") ``` Above example uses an invalid partition spec. Without this PR, we will drop all the partitions. The reason is Hive megastores getPartitions API returns all the partitions if we provide an invalid spec. - Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the user-specified partition specs exist before attempting to drop the partitions. Previously, we start drop the partition before completing checking the existence of all the partition specs. If any failure happened after we start to drop the partitions, we will log an error message to indicate which partitions have been dropped and which partitions have not been dropped. #### How was this patch tested? Modified the existing test cases and added new test cases. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12801 from gatorsmile/banDropMultiPart.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala47
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala116
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala78
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala50
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala9
6 files changed, 248 insertions, 58 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0fc4ab51de..54b30d3898 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -510,6 +510,7 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -523,13 +524,14 @@ class SessionCatalog(
*/
def dropPartitions(
tableName: TableIdentifier,
- parts: Seq[TablePartitionSpec],
+ specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
+ requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
}
/**
@@ -542,6 +544,9 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
+ val tableMetadata = getTableMetadata(tableName)
+ requireExactMatchedPartitionSpec(specs, tableMetadata)
+ requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -559,6 +564,7 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -571,6 +577,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
+ requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -595,6 +602,42 @@ class SessionCatalog(
externalCatalog.listPartitions(db, table, partialSpec)
}
+ /**
+ * Verify if the input partition spec exactly matches the existing defined partition spec
+ * The columns must be the same but the orders could be different.
+ */
+ private def requireExactMatchedPartitionSpec(
+ specs: Seq[TablePartitionSpec],
+ table: CatalogTable): Unit = {
+ val defined = table.partitionColumnNames.sorted
+ specs.foreach { s =>
+ if (s.keys.toSeq.sorted != defined) {
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
+ s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
+ s"table '${table.identifier}'")
+ }
+ }
+ }
+
+ /**
+ * Verify if the input partition spec partially matches the existing defined partition spec
+ * That is, the columns of partition spec should be part of the defined partition spec.
+ */
+ private def requirePartialMatchedPartitionSpec(
+ specs: Seq[TablePartitionSpec],
+ table: CatalogTable): Unit = {
+ val defined = table.partitionColumnNames
+ specs.foreach { s =>
+ if (!s.keys.forall(defined.contains)) {
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " +
+ s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
+ s"in table '${table.identifier}'")
+ }
+ }
+ }
+
// ----------------------------------------------------------------------------
// Functions
// ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index ae190c0da6..377e64ba01 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -627,6 +627,12 @@ abstract class CatalogTestUtils {
lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+ lazy val partWithMixedOrder = CatalogTablePartition(Map("b" -> "6", "a" -> "6"), storageFormat)
+ lazy val partWithLessColumns = CatalogTablePartition(Map("a" -> "1"), storageFormat)
+ lazy val partWithMoreColumns =
+ CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
+ lazy val partWithUnknownColumns =
+ CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 726b7a1e03..91e2e077cf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -482,8 +482,10 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
// Create partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
- sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false)
- assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3)))
+ sessionCatalog.createPartitions(
+ TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(
+ externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder)))
}
test("create partitions when database/table does not exist") {
@@ -508,6 +510,31 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
}
+ test("create partitions with invalid part spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1, partWithLessColumns), ignoreIfExists = false)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1, partWithMoreColumns), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithUnknownColumns, part1), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ }
+
test("drop partitions") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
@@ -565,6 +592,28 @@ class SessionCatalogSuite extends SparkFunSuite {
ignoreIfNotExists = true)
}
+ test("drop partitions with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithMoreColumns.spec),
+ ignoreIfNotExists = false)
+ }
+ assert(e.getMessage.contains(
+ "Partition spec is invalid. The spec (a, b, c) must be contained within " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithUnknownColumns.spec),
+ ignoreIfNotExists = false)
+ }
+ assert(e.getMessage.contains(
+ "Partition spec is invalid. The spec (a, unknown) must be contained within " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ }
+
test("get partition") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalog.getPartition(
@@ -591,6 +640,25 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("get partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("rename partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
@@ -633,6 +701,31 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("rename partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithLessColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("alter partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newLocation = newUriForDatabase()
@@ -673,6 +766,25 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("alter partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("list partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 64b90b1ed6..82123bec88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@@ -88,10 +88,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
CatalogColumn("a", "int"),
- CatalogColumn("b", "int"),
- CatalogColumn("c", "int"),
- CatalogColumn("d", "int")),
- partitionColumnNames = Seq("a", "b", "c", "d"),
+ CatalogColumn("b", "int")),
+ partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -563,9 +561,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("alter table: rename partition") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
+ val part1 = Map("a" -> "1", "b" -> "q")
+ val part2 = Map("a" -> "2", "b" -> "c")
+ val part3 = Map("a" -> "3", "b" -> "p")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -573,22 +571,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createTablePartition(catalog, part3, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3))
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')")
- sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='200', b='c')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "100"), Map("b" -> "200"), part3))
+ Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
// rename without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')")
+ sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "10"), Map("b" -> "200"), part3))
+ Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
// table to alter does not exist
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
// partition to rename does not exist
- intercept[AnalysisException] {
- sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')")
+ intercept[NoSuchPartitionException] {
+ sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
}
}
@@ -729,7 +727,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val partSpec = Map("a" -> "1")
+ val partSpec = Map("a" -> "1", "b" -> "2")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, partSpec, tableIdent)
@@ -762,7 +760,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
verifyLocation("/path/to/your/lovely/heart")
// set table partition location
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
}
verifyLocation("/path/to/part/ways", Some(partSpec))
// set table location without explicitly specifying database
@@ -771,7 +769,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
verifyLocation("/swanky/steak/place")
// set table partition location without explicitly specifying database
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
+ sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
}
verifyLocation("vienna", Some(partSpec))
// table to alter does not exist
@@ -833,10 +831,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
- val part4 = Map("d" -> "4")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val part3 = Map("a" -> "3", "b" -> "7")
+ val part4 = Map("a" -> "4", "b" -> "8")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -846,18 +844,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
- "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')")
+ "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris"))
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
}
// add partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -865,14 +863,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// table to alter does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
}
// partition to add already exists
intercept[AnalysisException] {
- sql("ALTER TABLE tab1 ADD PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -883,10 +881,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
- val part4 = Map("d" -> "4")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val part3 = Map("a" -> "3", "b" -> "7")
+ val part4 = Map("a" -> "4", "b" -> "8")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -899,7 +897,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')")
+ sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
@@ -907,24 +905,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')")
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
}
if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
}
// table to alter does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')")
+ sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')")
}
// partition to drop does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE tab1 DROP PARTITION (x='300')")
+ sql("ALTER TABLE tab1 DROP PARTITION (a='300')")
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')")
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
}
if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bb32459202..78c457b6c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls
import org.apache.hadoop.conf.Configuration
@@ -405,20 +406,43 @@ private[hive] class HiveClientImpl(
ignoreIfNotExists: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
- specs.foreach { s =>
- // The provided spec here can be a partial spec, i.e. it will match all partitions
- // whose specs are supersets of this partial spec. E.g. If a table has partitions
- // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
- val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
- if (matchingParts.isEmpty && !ignoreIfNotExists) {
- throw new AnalysisException(
- s"partition to drop '$s' does not exist in table '$table' database '$db'")
- }
- matchingParts.foreach { hivePartition =>
- val dropOptions = new PartitionDropOptions
- dropOptions.ifExists = ignoreIfNotExists
- client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+ // do the check at first and collect all the matching partitions
+ val matchingParts =
+ specs.flatMap { s =>
+ // The provided spec here can be a partial spec, i.e. it will match all partitions
+ // whose specs are supersets of this partial spec. E.g. If a table has partitions
+ // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+ val parts = client.getPartitions(hiveTable, s.asJava).asScala
+ if (parts.isEmpty && !ignoreIfNotExists) {
+ throw new AnalysisException(
+ s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " +
+ s"database '$db'")
+ }
+ parts.map(_.getValues)
+ }.distinct
+ var droppedParts = ArrayBuffer.empty[java.util.List[String]]
+ matchingParts.foreach { partition =>
+ val dropOptions = new PartitionDropOptions
+ dropOptions.ifExists = ignoreIfNotExists
+ try {
+ client.dropPartition(db, table, partition, dropOptions)
+ } catch {
+ case e: Exception =>
+ val remainingParts = matchingParts.toBuffer -- droppedParts
+ logError(
+ s"""
+ |======================
+ |Attempt to drop the partition specs in table '$table' database '$db':
+ |${specs.mkString("\n")}
+ |In this attempt, the following partitions have been dropped successfully:
+ |${droppedParts.mkString("\n")}
+ |The remaining partitions have not been dropped:
+ |${remainingParts.mkString("\n")}
+ |======================
+ """.stripMargin)
+ throw e
}
+ droppedParts += partition
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ae61322844..e2cef38556 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -184,10 +184,17 @@ class HiveDDLSuite
// After data insertion, all the directory are not empty
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
+ val message = intercept[AnalysisException] {
+ sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
+ }
+ assert(message.getMessage.contains(
+ "Partition spec is invalid. The spec (ds, unknowncol) must be contained within the " +
+ "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'"))
+
sql(
s"""
|ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-08'),
- |PARTITION (ds='2008-04-09', hr='12')
+ |PARTITION (hr='12')
""".stripMargin)
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))