diff options
6 files changed, 248 insertions, 58 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0fc4ab51de..54b30d3898 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -510,6 +510,7 @@ class SessionCatalog( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { + requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -523,13 +524,14 @@ class SessionCatalog( */ def dropPartitions( tableName: TableIdentifier, - parts: Seq[TablePartitionSpec], + specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { + requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) - externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) + externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists) } /** @@ -542,6 +544,9 @@ class SessionCatalog( tableName: TableIdentifier, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { + val tableMetadata = getTableMetadata(tableName) + requireExactMatchedPartitionSpec(specs, tableMetadata) + requireExactMatchedPartitionSpec(newSpecs, tableMetadata) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -559,6 +564,7 @@ class SessionCatalog( * this becomes a no-op. */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { + requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -571,6 +577,7 @@ class SessionCatalog( * If no database is specified, assume the table is in the current database. */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { + requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) @@ -595,6 +602,42 @@ class SessionCatalog( externalCatalog.listPartitions(db, table, partialSpec) } + /** + * Verify if the input partition spec exactly matches the existing defined partition spec + * The columns must be the same but the orders could be different. + */ + private def requireExactMatchedPartitionSpec( + specs: Seq[TablePartitionSpec], + table: CatalogTable): Unit = { + val defined = table.partitionColumnNames.sorted + specs.foreach { s => + if (s.keys.toSeq.sorted != defined) { + throw new AnalysisException( + s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " + + s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " + + s"table '${table.identifier}'") + } + } + } + + /** + * Verify if the input partition spec partially matches the existing defined partition spec + * That is, the columns of partition spec should be part of the defined partition spec. + */ + private def requirePartialMatchedPartitionSpec( + specs: Seq[TablePartitionSpec], + table: CatalogTable): Unit = { + val defined = table.partitionColumnNames + specs.foreach { s => + if (!s.keys.forall(defined.contains)) { + throw new AnalysisException( + s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " + + s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " + + s"in table '${table.identifier}'") + } + } + } + // ---------------------------------------------------------------------------- // Functions // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index ae190c0da6..377e64ba01 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -627,6 +627,12 @@ abstract class CatalogTestUtils { lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) + lazy val partWithMixedOrder = CatalogTablePartition(Map("b" -> "6", "a" -> "6"), storageFormat) + lazy val partWithLessColumns = CatalogTablePartition(Map("a" -> "1"), storageFormat) + lazy val partWithMoreColumns = + CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat) + lazy val partWithUnknownColumns = + CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat) lazy val funcClass = "org.apache.spark.myFunc" /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 726b7a1e03..91e2e077cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -482,8 +482,10 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2))) // Create partitions without explicitly specifying database sessionCatalog.setCurrentDatabase("mydb") - sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false) - assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3))) + sessionCatalog.createPartitions( + TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false) + assert(catalogPartitionsEqual( + externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder))) } test("create partitions when database/table does not exist") { @@ -508,6 +510,31 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true) } + test("create partitions with invalid part spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + var e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part1, partWithLessColumns), ignoreIfExists = false) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part1, partWithMoreColumns), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithUnknownColumns, part1), ignoreIfExists = true) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + } + test("drop partitions") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) @@ -565,6 +592,28 @@ class SessionCatalogSuite extends SparkFunSuite { ignoreIfNotExists = true) } + test("drop partitions with invalid partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + var e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithMoreColumns.spec), + ignoreIfNotExists = false) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, b, c) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithUnknownColumns.spec), + ignoreIfNotExists = false) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, unknown) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + } + test("get partition") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalog.getPartition( @@ -591,6 +640,25 @@ class SessionCatalogSuite extends SparkFunSuite { } } + test("get partition with invalid partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + var e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + } + test("rename partitions") { val catalog = new SessionCatalog(newBasicCatalog()) val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) @@ -633,6 +701,31 @@ class SessionCatalogSuite extends SparkFunSuite { } } + test("rename partition with invalid partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + var e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithLessColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithMoreColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("db2")), + Seq(part1.spec), Seq(partWithUnknownColumns.spec)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + } + test("alter partitions") { val catalog = new SessionCatalog(newBasicCatalog()) val newLocation = newUriForDatabase() @@ -673,6 +766,25 @@ class SessionCatalogSuite extends SparkFunSuite { } } + test("alter partition with invalid partition spec") { + val catalog = new SessionCatalog(newBasicCatalog()) + var e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + e = intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns)) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + + "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) + } + test("list partitions") { val catalog = new SessionCatalog(newBasicCatalog()) assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 64b90b1ed6..82123bec88 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} @@ -88,10 +88,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { CatalogColumn("col1", "int"), CatalogColumn("col2", "string"), CatalogColumn("a", "int"), - CatalogColumn("b", "int"), - CatalogColumn("c", "int"), - CatalogColumn("d", "int")), - partitionColumnNames = Seq("a", "b", "c", "d"), + CatalogColumn("b", "int")), + partitionColumnNames = Seq("a", "b"), createTime = 0L) } @@ -563,9 +561,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("alter table: rename partition") { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) - val part1 = Map("a" -> "1") - val part2 = Map("b" -> "2") - val part3 = Map("c" -> "3") + val part1 = Map("a" -> "1", "b" -> "q") + val part2 = Map("a" -> "2", "b" -> "c") + val part3 = Map("a" -> "3", "b" -> "p") createDatabase(catalog, "dbx") createTable(catalog, tableIdent) createTablePartition(catalog, part1, tableIdent) @@ -573,22 +571,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { createTablePartition(catalog, part3, tableIdent) assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) - sql("ALTER TABLE dbx.tab1 PARTITION (a='1') RENAME TO PARTITION (a='100')") - sql("ALTER TABLE dbx.tab1 PARTITION (b='2') RENAME TO PARTITION (b='200')") + sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')") + sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='200', b='c')") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(Map("a" -> "100"), Map("b" -> "200"), part3)) + Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3)) // rename without explicitly specifying database catalog.setCurrentDatabase("dbx") - sql("ALTER TABLE tab1 PARTITION (a='100') RENAME TO PARTITION (a='10')") + sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(Map("a" -> "10"), Map("b" -> "200"), part3)) + Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3)) // table to alter does not exist - intercept[AnalysisException] { + intercept[NoSuchTableException] { sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')") } // partition to rename does not exist - intercept[AnalysisException] { - sql("ALTER TABLE tab1 PARTITION (x='300') RENAME TO PARTITION (x='333')") + intercept[NoSuchPartitionException] { + sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')") } } @@ -729,7 +727,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def testSetLocation(isDatasourceTable: Boolean): Unit = { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) - val partSpec = Map("a" -> "1") + val partSpec = Map("a" -> "1", "b" -> "2") createDatabase(catalog, "dbx") createTable(catalog, tableIdent) createTablePartition(catalog, partSpec, tableIdent) @@ -762,7 +760,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { verifyLocation("/path/to/your/lovely/heart") // set table partition location maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'") + sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'") } verifyLocation("/path/to/part/ways", Some(partSpec)) // set table location without explicitly specifying database @@ -771,7 +769,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { verifyLocation("/swanky/steak/place") // set table partition location without explicitly specifying database maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'") + sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'") } verifyLocation("vienna", Some(partSpec)) // table to alter does not exist @@ -833,10 +831,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def testAddPartitions(isDatasourceTable: Boolean): Unit = { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) - val part1 = Map("a" -> "1") - val part2 = Map("b" -> "2") - val part3 = Map("c" -> "3") - val part4 = Map("d" -> "4") + val part1 = Map("a" -> "1", "b" -> "5") + val part2 = Map("a" -> "2", "b" -> "6") + val part3 = Map("a" -> "3", "b" -> "7") + val part4 = Map("a" -> "4", "b" -> "8") createDatabase(catalog, "dbx") createTable(catalog, tableIdent) createTablePartition(catalog, part1, tableIdent) @@ -846,18 +844,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) maybeWrapException(isDatasourceTable) { sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + - "PARTITION (b='2') LOCATION 'paris' PARTITION (c='3')") + "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')") } if (!isDatasourceTable) { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty) - assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Some("paris")) + assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris")) assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty) } // add partitions without explicitly specifying database catalog.setCurrentDatabase("dbx") maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") } if (!isDatasourceTable) { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == @@ -865,14 +863,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } // table to alter does not exist intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (d='4')") + sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')") } // partition to add already exists intercept[AnalysisException] { - sql("ALTER TABLE tab1 ADD PARTITION (d='4')") + sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')") } maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (d='4')") + sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") } if (!isDatasourceTable) { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == @@ -883,10 +881,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def testDropPartitions(isDatasourceTable: Boolean): Unit = { val catalog = spark.sessionState.catalog val tableIdent = TableIdentifier("tab1", Some("dbx")) - val part1 = Map("a" -> "1") - val part2 = Map("b" -> "2") - val part3 = Map("c" -> "3") - val part4 = Map("d" -> "4") + val part1 = Map("a" -> "1", "b" -> "5") + val part2 = Map("a" -> "2", "b" -> "6") + val part3 = Map("a" -> "3", "b" -> "7") + val part4 = Map("a" -> "4", "b" -> "8") createDatabase(catalog, "dbx") createTable(catalog, tableIdent) createTablePartition(catalog, part1, tableIdent) @@ -899,7 +897,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { convertToDatasourceTable(catalog, tableIdent) } maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (d='4'), PARTITION (c='3')") + sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')") } if (!isDatasourceTable) { assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) @@ -907,24 +905,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // drop partitions without explicitly specifying database catalog.setCurrentDatabase("dbx") maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (b='2')") + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')") } if (!isDatasourceTable) { - assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) } // table to alter does not exist intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (b='2')") + sql("ALTER TABLE does_not_exist DROP IF EXISTS PARTITION (a='2')") } // partition to drop does not exist intercept[AnalysisException] { - sql("ALTER TABLE tab1 DROP PARTITION (x='300')") + sql("ALTER TABLE tab1 DROP PARTITION (a='300')") } maybeWrapException(isDatasourceTable) { - sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (x='300')") + sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')") } if (!isDatasourceTable) { - assert(catalog.listPartitions(tableIdent).map(_.spec) == Seq(part1)) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bb32459202..78c457b6c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.language.reflectiveCalls import org.apache.hadoop.conf.Configuration @@ -405,20 +406,43 @@ private[hive] class HiveClientImpl( ignoreIfNotExists: Boolean): Unit = withHiveState { // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) - specs.foreach { s => - // The provided spec here can be a partial spec, i.e. it will match all partitions - // whose specs are supersets of this partial spec. E.g. If a table has partitions - // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. - val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala - if (matchingParts.isEmpty && !ignoreIfNotExists) { - throw new AnalysisException( - s"partition to drop '$s' does not exist in table '$table' database '$db'") - } - matchingParts.foreach { hivePartition => - val dropOptions = new PartitionDropOptions - dropOptions.ifExists = ignoreIfNotExists - client.dropPartition(db, table, hivePartition.getValues, dropOptions) + // do the check at first and collect all the matching partitions + val matchingParts = + specs.flatMap { s => + // The provided spec here can be a partial spec, i.e. it will match all partitions + // whose specs are supersets of this partial spec. E.g. If a table has partitions + // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both. + val parts = client.getPartitions(hiveTable, s.asJava).asScala + if (parts.isEmpty && !ignoreIfNotExists) { + throw new AnalysisException( + s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " + + s"database '$db'") + } + parts.map(_.getValues) + }.distinct + var droppedParts = ArrayBuffer.empty[java.util.List[String]] + matchingParts.foreach { partition => + val dropOptions = new PartitionDropOptions + dropOptions.ifExists = ignoreIfNotExists + try { + client.dropPartition(db, table, partition, dropOptions) + } catch { + case e: Exception => + val remainingParts = matchingParts.toBuffer -- droppedParts + logError( + s""" + |====================== + |Attempt to drop the partition specs in table '$table' database '$db': + |${specs.mkString("\n")} + |In this attempt, the following partitions have been dropped successfully: + |${droppedParts.mkString("\n")} + |The remaining partitions have not been dropped: + |${remainingParts.mkString("\n")} + |====================== + """.stripMargin) + throw e } + droppedParts += partition } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ae61322844..e2cef38556 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -184,10 +184,17 @@ class HiveDDLSuite // After data insertion, all the directory are not empty assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) + val message = intercept[AnalysisException] { + sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')") + } + assert(message.getMessage.contains( + "Partition spec is invalid. The spec (ds, unknowncol) must be contained within the " + + "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'")) + sql( s""" |ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-08'), - |PARTITION (ds='2008-04-09', hr='12') + |PARTITION (hr='12') """.stripMargin) assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) |