aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-15 08:24:41 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-15 08:24:41 +0800
commitdacc382f0c918f1ca808228484305ce0e21c705e (patch)
treefa222f88241a07e53f87695625d5c2c1fc9350d3
parent7ded39c223429265b23940ca8244660dbee8320c (diff)
downloadspark-dacc382f0c918f1ca808228484305ce0e21c705e.tar.gz
spark-dacc382f0c918f1ca808228484305ce0e21c705e.tar.bz2
spark-dacc382f0c918f1ca808228484305ce0e21c705e.zip
[SPARK-19887][SQL] dynamic partition keys can be null or empty string
## What changes were proposed in this pull request? When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null. This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252 ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #17277 from cloud-fan/partition.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala24
7 files changed, 39 insertions, 16 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index a418edc302..a8693dcca5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -118,7 +118,7 @@ object ExternalCatalogUtils {
}
def getPartitionPathString(col: String, value: String): String = {
- val partitionString = if (value == null) {
+ val partitionString = if (value == null || value.isEmpty) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index b862deaf36..70ed44e025 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -116,7 +116,12 @@ case class CatalogTablePartition(
val timeZoneId = caseInsensitiveProperties.getOrElse(
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
InternalRow.fromSeq(partitionSchema.map { field =>
- Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
+ val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+ null
+ } else {
+ spec(field.name)
+ }
+ Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
})
}
}
@@ -164,7 +169,7 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
- * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
+ * @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
* When using a Hive Metastore, this flag is set to false if a case-
* sensitive schema was unable to be read from the table properties.
* Used to trigger case-sensitive schema inference at query time, when
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 39b010efec..8ebad676ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -319,7 +319,7 @@ case class FileSourceScanExec(
val input = ctx.freshName("input")
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
- new BoundReference(i, a.dataType, a.nullable)
+ BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index ce33298aeb..7957224ce4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -335,14 +335,11 @@ object FileFormatWriter extends Logging {
/** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
private def partitionPathExpression: Seq[Expression] = {
desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
- val escaped = ScalaUDF(
- ExternalCatalogUtils.escapePathName _,
+ val partitionName = ScalaUDF(
+ ExternalCatalogUtils.getPartitionPathString _,
StringType,
- Seq(Cast(c, StringType, Option(desc.timeZoneId))),
- Seq(StringType))
- val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
- val partitionName = Literal(ExternalCatalogUtils.escapePathName(c.name) + "=") :: str :: Nil
- if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+ Seq(Literal(c.name), Cast(c, StringType, Option(desc.timeZoneId))))
+ if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 09876bbc2f..03980922ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
@@ -129,7 +128,7 @@ object PartitioningUtils {
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
// TODO: Selective case sensitivity.
- val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase())
+ val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
assert(
discoveredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8860b7dc07..8a3c81ac8b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1012,8 +1012,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
- clientPartitionNames.map { partName =>
- val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
+ clientPartitionNames.map { partitionPath =>
+ val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) =>
partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
}.mkString("/")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 96385961c9..9440a17677 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -316,6 +316,28 @@ class PartitionProviderCompatibilitySuite
}
}
}
+
+ test(s"SPARK-19887 partition value is null - partition management $enabled") {
+ withTable("test") {
+ Seq((1, "p", 1), (2, null, 2)).toDF("a", "b", "c")
+ .write.partitionBy("b", "c").saveAsTable("test")
+ checkAnswer(spark.table("test"),
+ Row(1, "p", 1) :: Row(2, null, 2) :: Nil)
+
+ Seq((3, null: String, 3)).toDF("a", "b", "c")
+ .write.mode("append").partitionBy("b", "c").saveAsTable("test")
+ checkAnswer(spark.table("test"),
+ Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Nil)
+ // make sure partition pruning also works.
+ checkAnswer(spark.table("test").filter($"b".isNotNull), Row(1, "p", 1))
+
+ // empty string is an invalid partition value and we treat it as null when read back.
+ Seq((4, "", 4)).toDF("a", "b", "c")
+ .write.mode("append").partitionBy("b", "c").saveAsTable("test")
+ checkAnswer(spark.table("test"),
+ Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Row(4, null, 4) :: Nil)
+ }
+ }
}
/**