aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-01-18 02:01:30 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-18 02:01:30 +0800
commita23debd7bc8f85ea49c54b8cf3cd112cf0a803ff (patch)
treeabb2b167618351b3181ed4956dd93daf356c7359
parenta83accfcfd6a92afac5040c50577258ab83d10dd (diff)
downloadspark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.tar.gz
spark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.tar.bz2
spark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.zip
[SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in partition spec
### What changes were proposed in this pull request? Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error. ```Scala val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name") df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable") spark.sql("alter table partitionedTable drop partition(partCol1='')") spark.table("partitionedTable").show() ``` In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values. When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs. ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16583 from gatorsmile/disallowEmptyPartColValue.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala26
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala10
5 files changed, 106 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 12af9e0c32..8008fcd639 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -331,7 +331,7 @@ class SessionCatalog(
def loadPartition(
name: TableIdentifier,
loadPath: String,
- partition: TablePartitionSpec,
+ spec: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
@@ -340,8 +340,9 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
- db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
+ db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}
def defaultTablePath(tableIdent: TableIdentifier): String = {
@@ -693,6 +694,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
@@ -711,6 +713,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}
@@ -731,6 +734,8 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
+ requireNonEmptyValueInPartitionSpec(specs)
+ requireNonEmptyValueInPartitionSpec(newSpecs)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
@@ -749,6 +754,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.alterPartitions(db, table, parts)
}
@@ -762,6 +768,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.getPartition(db, table, spec)
}
@@ -781,6 +788,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitionNames(db, table, partialSpec)
}
@@ -801,6 +809,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitions(db, table, partialSpec)
}
@@ -820,6 +829,19 @@ class SessionCatalog(
}
/**
+ * Verify if the input partition spec has any empty value.
+ */
+ private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
+ specs.foreach { s =>
+ if (s.values.exists(_.isEmpty)) {
+ val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
+ }
+ }
+ }
+
+ /**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 91f464b47a..acf3bcfdaa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
lazy val partWithUnknownColumns =
CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
+ lazy val partWithEmptyValue =
+ CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ae93dffbab..7a7de25acb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("drop partitions") {
@@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithEmptyValue.spec, part1.spec),
+ ignoreIfNotExists = false,
+ purge = false,
+ retainData = false)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("get partition") {
@@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("rename partitions") {
@@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithEmptyValue.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("alter partitions") {
@@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partition names") {
@@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {
test("list partition names with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ var e = intercept[AnalysisException] {
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
- Some(Map("unknown" -> "unknown")))
+ Some(partWithEmptyValue.spec))
}
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partitions") {
@@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {
test("list partitions with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.listPartitions(
- TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+ var e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
}
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partitions when database/table does not exist") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5c0e2f6ec4..9a6144c5e3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -471,6 +471,7 @@ private[hive] class HiveClientImpl(
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
+ assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
@@ -545,6 +546,7 @@ private[hive] class HiveClientImpl(
// -1 for result limit means "no limit/return all"
client.getPartitionNames(table.database, table.identifier.table, -1)
case Some(s) =>
+ assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
}
hivePartitionNames.asScala.sorted
@@ -568,7 +570,9 @@ private[hive] class HiveClientImpl(
val hiveTable = toHiveTable(table)
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
- case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
+ case Some(s) =>
+ assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
+ client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e3f1667249..ef62be39cd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -247,6 +247,16 @@ class HiveDDLSuite
}
}
+ test("SPARK-19129: drop partition with a empty string will drop the whole table") {
+ val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
+ df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
+ val e = intercept[AnalysisException] {
+ spark.sql("alter table partitionedTable drop partition(partCol1='')")
+ }.getMessage
+ assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
+ "partition column value"))
+ }
+
test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>