aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2017-01-18 02:01:30 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-18 02:01:30 +0800
commita23debd7bc8f85ea49c54b8cf3cd112cf0a803ff (patch)
treeabb2b167618351b3181ed4956dd93daf356c7359 /sql/catalyst/src
parenta83accfcfd6a92afac5040c50577258ab83d10dd (diff)
downloadspark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.tar.gz
spark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.tar.bz2
spark-a23debd7bc8f85ea49c54b8cf3cd112cf0a803ff.zip
[SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in partition spec
### What changes were proposed in this pull request? Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error. ```Scala val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name") df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable") spark.sql("alter table partitionedTable drop partition(partCol1='')") spark.table("partitionedTable").show() ``` In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values. When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs. ### How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #16583 from gatorsmile/disallowEmptyPartColValue.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala26
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala70
3 files changed, 91 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 12af9e0c32..8008fcd639 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -331,7 +331,7 @@ class SessionCatalog(
def loadPartition(
name: TableIdentifier,
loadPath: String,
- partition: TablePartitionSpec,
+ spec: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
@@ -340,8 +340,9 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
- db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
+ db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}
def defaultTablePath(tableIdent: TableIdentifier): String = {
@@ -693,6 +694,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
@@ -711,6 +713,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}
@@ -731,6 +734,8 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
+ requireNonEmptyValueInPartitionSpec(specs)
+ requireNonEmptyValueInPartitionSpec(newSpecs)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
@@ -749,6 +754,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.alterPartitions(db, table, parts)
}
@@ -762,6 +768,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.getPartition(db, table, spec)
}
@@ -781,6 +788,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitionNames(db, table, partialSpec)
}
@@ -801,6 +809,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitions(db, table, partialSpec)
}
@@ -820,6 +829,19 @@ class SessionCatalog(
}
/**
+ * Verify if the input partition spec has any empty value.
+ */
+ private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
+ specs.foreach { s =>
+ if (s.values.exists(_.isEmpty)) {
+ val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
+ throw new AnalysisException(
+ s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
+ }
+ }
+ }
+
+ /**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 91f464b47a..acf3bcfdaa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
lazy val partWithUnknownColumns =
CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
+ lazy val partWithEmptyValue =
+ CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ae93dffbab..7a7de25acb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.createPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithEmptyValue, part1), ignoreIfExists = true)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("drop partitions") {
@@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.dropPartitions(
+ TableIdentifier("tbl2", Some("db2")),
+ Seq(partWithEmptyValue.spec, part1.spec),
+ ignoreIfNotExists = false,
+ purge = false,
+ retainData = false)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("get partition") {
@@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("rename partitions") {
@@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.renamePartitions(
+ TableIdentifier("tbl1", Some("db2")),
+ Seq(part1.spec), Seq(partWithEmptyValue.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("alter partitions") {
@@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
+ e = intercept[AnalysisException] {
+ catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partition names") {
@@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {
test("list partition names with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
+ var e = intercept[AnalysisException] {
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
- Some(Map("unknown" -> "unknown")))
+ Some(partWithEmptyValue.spec))
}
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partitions") {
@@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {
test("list partitions with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
- intercept[AnalysisException] {
- catalog.listPartitions(
- TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
+ var e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
+ Some(partWithUnknownColumns.spec))
+ }
+ assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
+ "contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
+ e = intercept[AnalysisException] {
+ catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
}
+ assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
+ "empty partition column value"))
}
test("list partitions when database/table does not exist") {