aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-12 11:14:40 -0700
committerAndrew Or <andrew@databricks.com>2016-05-12 11:14:40 -0700
commitbe617f3d0695982f982006fdd79afe3e3730b4c4 (patch)
treed15e5bc1f502c4cf61b866ccc12ec45f7de64967 /sql/hive
parent470de743ecf3617babd86f50ab203e85aa975d69 (diff)
downloadspark-be617f3d0695982f982006fdd79afe3e3730b4c4.tar.gz
spark-be617f3d0695982f982006fdd79afe3e3730b4c4.tar.bz2
spark-be617f3d0695982f982006fdd79afe3e3730b4c4.zip
[SPARK-14684][SPARK-15277][SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping
#### What changes were proposed in this pull request? ~~Currently, multiple partitions are allowed to drop by using a single DDL command: Alter Table Drop Partition. However, the internal implementation could break atomicity. That means, we could just drop a subset of qualified partitions, if hitting an exception when dropping one of qualified partitions~~ ~~This PR contains the following behavior changes:~~ ~~- disallow dropping multiple partitions by a single command ~~ ~~- allow users to input predicates in partition specification and issue a nicer error message if the predicate's comparison operator is not `=`.~~ ~~- verify the partition spec in SessionCatalog. This can ensure each partition spec in `Drop Partition` does not correspond to multiple partitions.~~ This PR has two major parts: - Verify the partition spec in SessionCatalog for fixing the following issue: ```scala sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')") ``` Above example uses an invalid partition spec. Without this PR, we will drop all the partitions. The reason is Hive megastores getPartitions API returns all the partitions if we provide an invalid spec. - Re-implemented the `dropPartitions` in `HiveClientImpl`. Now, we always check if all the user-specified partition specs exist before attempting to drop the partitions. Previously, we start drop the partition before completing checking the existence of all the partition specs. If any failure happened after we start to drop the partitions, we will log an error message to indicate which partitions have been dropped and which partitions have not been dropped. #### How was this patch tested? Modified the existing test cases and added new test cases. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12801 from gatorsmile/banDropMultiPart.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala50
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala9
2 files changed, 45 insertions, 14 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bb32459202..78c457b6c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls
import org.apache.hadoop.conf.Configuration
@@ -405,20 +406,43 @@ private[hive] class HiveClientImpl(
ignoreIfNotExists: Boolean): Unit = withHiveState {
// TODO: figure out how to drop multiple partitions in one call
val hiveTable = client.getTable(db, table, true /* throw exception */)
- specs.foreach { s =>
- // The provided spec here can be a partial spec, i.e. it will match all partitions
- // whose specs are supersets of this partial spec. E.g. If a table has partitions
- // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
- val matchingParts = client.getPartitions(hiveTable, s.asJava).asScala
- if (matchingParts.isEmpty && !ignoreIfNotExists) {
- throw new AnalysisException(
- s"partition to drop '$s' does not exist in table '$table' database '$db'")
- }
- matchingParts.foreach { hivePartition =>
- val dropOptions = new PartitionDropOptions
- dropOptions.ifExists = ignoreIfNotExists
- client.dropPartition(db, table, hivePartition.getValues, dropOptions)
+ // do the check at first and collect all the matching partitions
+ val matchingParts =
+ specs.flatMap { s =>
+ // The provided spec here can be a partial spec, i.e. it will match all partitions
+ // whose specs are supersets of this partial spec. E.g. If a table has partitions
+ // (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
+ val parts = client.getPartitions(hiveTable, s.asJava).asScala
+ if (parts.isEmpty && !ignoreIfNotExists) {
+ throw new AnalysisException(
+ s"No partition is dropped. One partition spec '$s' does not exist in table '$table' " +
+ s"database '$db'")
+ }
+ parts.map(_.getValues)
+ }.distinct
+ var droppedParts = ArrayBuffer.empty[java.util.List[String]]
+ matchingParts.foreach { partition =>
+ val dropOptions = new PartitionDropOptions
+ dropOptions.ifExists = ignoreIfNotExists
+ try {
+ client.dropPartition(db, table, partition, dropOptions)
+ } catch {
+ case e: Exception =>
+ val remainingParts = matchingParts.toBuffer -- droppedParts
+ logError(
+ s"""
+ |======================
+ |Attempt to drop the partition specs in table '$table' database '$db':
+ |${specs.mkString("\n")}
+ |In this attempt, the following partitions have been dropped successfully:
+ |${droppedParts.mkString("\n")}
+ |The remaining partitions have not been dropped:
+ |${remainingParts.mkString("\n")}
+ |======================
+ """.stripMargin)
+ throw e
}
+ droppedParts += partition
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ae61322844..e2cef38556 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -184,10 +184,17 @@ class HiveDDLSuite
// After data insertion, all the directory are not empty
assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
+ val message = intercept[AnalysisException] {
+ sql(s"ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-09', unknownCol='12')")
+ }
+ assert(message.getMessage.contains(
+ "Partition spec is invalid. The spec (ds, unknowncol) must be contained within the " +
+ "partition spec (ds, hr) defined in table '`default`.`exttable_with_partitions`'"))
+
sql(
s"""
|ALTER TABLE $externalTab DROP PARTITION (ds='2008-04-08'),
- |PARTITION (ds='2008-04-09', hr='12')
+ |PARTITION (hr='12')
""".stripMargin)
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
Set(Map("ds" -> "2008-04-09", "hr" -> "11")))