aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala47
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala116
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala78
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala50
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala9
6 files changed, 248 insertions, 58 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0fc4ab51de..54b30d3898 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -510,6 +510,7 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -523,13 +524,14 @@ class SessionCatalog(
*/
def dropPartitions(
tableName: TableIdentifier,
- parts: Seq[TablePartitionSpec],
+ specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
+ requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
+ externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
}
/**
@@ -542,6 +544,9 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
+ val tableMetadata = getTableMetadata(tableName)
+ requireExactMatchedPartitionSpec(specs, tableMetadata)
+ requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -559,6 +564,7 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -571,6 +577,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
+ requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
@@ -595,6 +602,42 @@ class SessionCatalog(
externalCatalog.listPartitions(db, table, partialSpec)
}
+ /**
+ * Verify if the input partition spec exactly matches the existing defined partition spec
+ * The columns must be the same but the orders could be different.
+ */
+ private def requireExactMatchedPartitionSpec(
+ specs: Seq[TablePartitionSpec],
+ table: CatalogTable): Unit = {
+ val defined = table.partitionColumnNames.sorted
+ specs.foreach { s =>
+ if (s.keys.toSeq.sorted != defined) {
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " +
+ s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " +
+ s"table '${table.identifier}'")
+ }
+ }
+ }
+
+ /**
+ * Verify if the input partition spec partially matches the existing defined partition spec
+ * That is, the columns of partition spec should be part of the defined partition spec.
+ */
+ private def requirePartialMatchedPartitionSpec(
+ specs: Seq[TablePartitionSpec],
+ table: CatalogTable): Unit = {
+ val defined = table.partitionColumnNames
+ specs.foreach { s =>
+ if (!s.keys.forall(defined.contains)) {
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " +
+ s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " +
+ s"in table '${table.identifier}'")
+ }
+ }
+ }
+
// ----------------------------------------------------------------------------
// Functions
// ----------------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index ae190c0da6..377e64ba01 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -627,6 +627,12 @@ abstract class CatalogTestUtils {
lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat)
+ lazy val partWithMixedOrder = CatalogTablePartition(Map("b" -> "6", "a" -> "6"), storageFormat)
+ lazy val partWithLessColumns = CatalogTablePartition(Map("a" -> "1"), storageFormat)
+ lazy val partWithMoreColumns =
+ CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
+ lazy val partWithUnknownColumns =
+ CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 726b7a1e03..91e2e077cf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -482,8 +482,10 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2)))
// Create partitions without explicitly specifying database
sessionCatalog.setCurrentDatabase("mydb")
- sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false)
- assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3)))
+ sessionCatalog.createPartitions(
+ TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false)
+ assert(catalogPartitionsEqual(
+ externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder)))
}
test("create partitions when database/table does not exist") {
@@ -508,6 +510,31 @@ class SessionCatalogSuite extends SparkFunSuite {
TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true)
}
+ test("create partitions with invalid part spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1, partWithLessColumns), ignoreIfExists = false)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(part1, partWithMoreColumns), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithUnknownColumns, part1), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ }
+
test("drop partitions") {
val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
@@ -565,6 +592,28 @@ class SessionCatalogSuite extends SparkFunSuite {
ignoreIfNotExists = true)
}
+ test("drop partitions with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithMoreColumns.spec),
+ ignoreIfNotExists = false)
+ }
+ assert(e.getMessage.contains(
+ "Partition spec is invalid. The spec (a, b, c) must be contained within " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithUnknownColumns.spec),
+ ignoreIfNotExists = false)
+ }
+ assert(e.getMessage.contains(
+ "Partition spec is invalid. The spec (a, unknown) must be contained within " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ }
+
test("get partition") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalog.getPartition(
@@ -591,6 +640,25 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("get partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("rename partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101"))
@@ -633,6 +701,31 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("rename partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithLessColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("alter partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
val newLocation = newUriForDatabase()
@@ -673,6 +766,25 @@ class SessionCatalogSuite extends SparkFunSuite {
}
}
+ test("alter partition with invalid partition spec") {
+ val catalog = new SessionCatalog(newBasicCatalog())
+ var e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
+ "the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ }
+
test("list partitions") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 64b90b1ed6..82123bec88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException
+import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
@@ -88,10 +88,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
CatalogColumn("a", "int"),
- CatalogColumn("b", "int"),
- CatalogColumn("c", "int"),
- CatalogColumn("d", "int")),
- partitionColumnNames = Seq("a", "b", "c", "d"),
+ CatalogColumn("b", "int")),
+ partitionColumnNames = Seq("a", "b"),
createTime = 0L)
}
@@ -563,9 +561,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("alter table: rename partition") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
+ val part1 = Map("a" -> "1", "b" -> "q")
+ val part2 = Map("a" -> "2", "b" -> "c")
+ val part3 = Map("a" -> "3", "b" -> "p")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -573,22 +571,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createTablePartition(catalog, part3, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3))
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')")
- sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='200', b='c')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "100"), Map("b" -> "200"), part3))
+ Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
// rename without explicitly specifying database
catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')")
+ sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
- Set(Map("a" -> "10"), Map("b" -> "200"), part3))
+ Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
// table to alter does not exist
- intercept[AnalysisException] {
+ intercept[NoSuchTableException] {
sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
// partition to rename does not exist
- intercept[AnalysisException] {
- sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')")
+ intercept[NoSuchPartitionException] {
+ sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
}
}
@@ -729,7 +727,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val partSpec = Map("a" -> "1")
+ val partSpec = Map("a" -> "1", "b" -> "2")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, partSpec, tableIdent)
@@ -762,7 +760,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
verifyLocation("/path/to/your/lovely/heart")
// set table partition location
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
+ sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
}
verifyLocation("/path/to/part/ways", Some(partSpec))
// set table location without explicitly specifying database
@@ -771,7 +769,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
verifyLocation("/swanky/steak/place")
// set table partition location without explicitly specifying database
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
+ sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
}
verifyLocation("vienna", Some(partSpec))
// table to alter does not exist
@@ -833,10 +831,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
- val part4 = Map("d" -> "4")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val part3 = Map("a" -> "3", "b" -> "7")
+ val part4 = Map("a" -> "4", "b" -> "8")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -846,18 +844,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
- "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')")
+ "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
- assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris"))
+ assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
}
// add partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -865,14 +863,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// table to alter does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')")
}
// partition to add already exists
intercept[AnalysisException] {
- sql("ALTER TABLE tab1 ADD PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')")
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')")
+ sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
@@ -883,10 +881,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
- val part1 = Map("a" -> "1")
- val part2 = Map("b" -> "2")
- val part3 = Map("c" -> "3")
- val part4 = Map("d" -> "4")
+ val part1 = Map("a" -> "1", "b" -> "5")
+ val part2 = Map("a" -> "2", "b" -> "6")
+ val part3 = Map("a" -> "3", "b" -> "7")
+ val part4 = Map("a" -> "4", "b" -> "8")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
@@ -899,7 +897,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')")
+ sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
}
if (!isDatasourceTable) {
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
@@ -907,24 +905,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')")
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
}
if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
}
// table to alter does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')")
+ sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')")
}
// partition to drop does not exist
intercept[AnalysisException] {
- sql("ALTER TABLE tab1 DROP PARTITION (x='300')")
+ sql("ALTER TABLE tab1 DROP PARTITION (a='300')")
}
maybeWrapException(isDatasourceTable) {
- sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')")
+ sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
}
if (!isDatasourceTable) {
- assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1))
+ assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bb32459202..78c457b6c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls
import org.apache.hadoop.conf.Configuration
@@ -405,20 +406,43 @@ private[hive] class HiveClientImpl(
ignoreIfNotExists: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
- specs.foreach { s =>
- // The provided spec here can be a partial spec, i.e. it will match all partitions
- // whose specs are supersets of this partial spec. E.g. If a table has partitions
- // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
- val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
- if (matchingParts.isEmpty && !ignoreIfNotExists) {
- throw new AnalysisException(
- s"partition to drop '$s' does not exist in table '$table' database '$db'")
- }
- matchingParts.foreach { hivePartition =>
- val dropOptions = new PartitionDropOptions
- dropOptions.ifExists = ignoreIfNotExists
- client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+ // do the check at first and collect all the matching partitions
+ val matchingParts =
+ specs.flatMap { s =>
+ // The provided spec here can be a partial spec, i.e. it will match all partitions
+ // whose specs are supersets of this partial spec. E.g. If a table has partitions
+ // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+ val parts = client.getPartitions(hiveTable, s.asJava).asScala
+ if (parts.isEmpty && !ignoreIfNotExists) {
+ throw new AnalysisException(
+ s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " +
+ s"database '$db'")
+ }
+ parts.map(_.getValues)
+ }.distinct
+ var droppedParts = ArrayBuffer.empty[java.util.List[String]]
+ matchingParts.foreach { partition =>
+ val dropOptions = new PartitionDropOptions
+ dropOptions.ifExists = ignoreIfNotExists
+ try {
+ client.dropPartition(db, table, partition, dropOptions)
+ } catch {
+ case e: Exception =>
+ val remainingParts = matchingParts.toBuffer -- droppedParts
+ logError(
+ s"""
+ |======================
+ |Attempt to drop the partition specs in table '$table' database '$db':
+ |${specs.mkString("\n")}
+ |In this attempt, the following partitions have been dropped successfully:
+ |${droppedParts.mkString("\n")}
+ |The remaining partitions have not been dropped:
+ |${remainingParts.mkString("\n")}
+ |======================
+ """.stripMargin)
+ throw e
}
+ droppedParts += partition
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ae61322844..e2cef38556 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -184,10 +184,17 @@ class HiveDDLSuite
// After data insertion, all the directory are not empty
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
+ val message = intercept[AnalysisException] {
+ sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
+ }
+ assert(message.getMessage.contains(
+ "Partition spec is invalid. The spec (ds, unknowncol) must be contained within the " +
+ "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'"))
+
sql(
s"""
|ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-08'),
- |PARTITION (ds='2008-04-09', hr='12')
+ |PARTITION (hr='12')
""".stripMargin)
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))