aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2017-03-03 16:35:54 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-03 16:35:54 -0800
commit2a7921a813ecd847fd933ffef10edc64684e9df7 (patch)
treee13297965da232e7e60c4ebc2f1e9ce050f11851 /sql/core/src/main
parentba186a841fcfcd73a1530ca2418cc08bb0df92e1 (diff)
downloadspark-2a7921a813ecd847fd933ffef10edc64684e9df7.tar.gz
spark-2a7921a813ecd847fd933ffef10edc64684e9df7.tar.bz2
spark-2a7921a813ecd847fd933ffef10edc64684e9df7.zip
[SPARK-18939][SQL] Timezone support in partition values.
## What changes were proposed in this pull request? This is a follow-up pr of #16308 and #16750. This pr enables timezone support in partition values. We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts") df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp] scala> df.show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ scala> df.write.partitionBy("ts").save("/path/to/gmtpartition") ``` ```sh $ ls /path/to/gmtpartition/ _SUCCESS ts=2016-01-01 00%3A00%3A00 ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition") ``` ```sh $ ls /path/to/pstpartition/ _SUCCESS ts=2015-12-31 16%3A00%3A00 ``` We can properly read the partition values if the session local timezone and the timezone of the partition values are the same: ```scala scala> spark.read.load("/path/to/gmtpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` And even if the timezones are different, we can properly read the values with setting corrent timezone option: ```scala // wrong result scala> spark.read.load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2015-12-31 16:00:00| +---+-------------------+ // correct result scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #17053 from ueshin/issues/SPARK-18939.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala42
5 files changed, 62 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index b02edd4c74..aa578f4d23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
@@ -103,11 +103,13 @@ case class OptimizeMetadataOnlyQuery(
case relation: CatalogRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
+ val caseInsensitiveProperties =
+ CaseInsensitiveMap(relation.tableMeta.storage.properties)
+ val timeZoneId = caseInsensitiveProperties.get("timeZone")
+ .getOrElse(conf.sessionLocalTimeZone)
val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
InternalRow.fromSeq(partAttrs.map { attr =>
- // TODO: use correct timezone for partition values.
- Cast(Literal(p.spec(attr.name)), attr.dataType,
- Option(DateTimeUtils.defaultTimeZone().getID)).eval()
+ Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
})
}
LocalRelation(partAttrs, partitionData)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 1235a4b12f..2068811661 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -72,7 +72,8 @@ class CatalogFileIndex(
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
- p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedInMemoryFileIndex(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index c17796811c..950e5ca0d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -68,7 +68,8 @@ object FileFormatWriter extends Logging {
val bucketIdExpression: Option[Expression],
val path: String,
val customPartitionLocations: Map[TablePartitionSpec, String],
- val maxRecordsPerFile: Long)
+ val maxRecordsPerFile: Long,
+ val timeZoneId: String)
extends Serializable {
assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns),
@@ -122,9 +123,11 @@ object FileFormatWriter extends Logging {
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get)
}
+ val caseInsensitiveOptions = CaseInsensitiveMap(options)
+
// Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
- fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
+ fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataColumns.toStructType)
val description = new WriteJobDescription(
uuid = UUID.randomUUID().toString,
@@ -136,8 +139,10 @@ object FileFormatWriter extends Logging {
bucketIdExpression = bucketIdExpression,
path = outputSpec.outputPath,
customPartitionLocations = outputSpec.customPartitionLocations,
- maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
- .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
+ maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
+ .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
+ timeZoneId = caseInsensitiveOptions.get("timeZone")
+ .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
)
// We should first sort by partition columns, then bucket id, and finally sorting columns.
@@ -330,11 +335,10 @@ object FileFormatWriter extends Logging {
/** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
private def partitionPathExpression: Seq[Expression] = {
desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
- // TODO: use correct timezone for partition values.
val escaped = ScalaUDF(
ExternalCatalogUtils.escapePathName _,
StringType,
- Seq(Cast(c, StringType, Option(DateTimeUtils.defaultTimeZone().getID))),
+ Seq(Cast(c, StringType, Option(desc.timeZoneId))),
Seq(StringType))
val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 549257c0e1..c8097a7fab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -30,7 +30,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -125,22 +125,27 @@ abstract class PartitioningAwareFileIndex(
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq
+
+ val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
+ val timeZoneId = caseInsensitiveOptions.get("timeZone")
+ .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
+
userPartitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = false,
- basePaths = basePaths)
+ basePaths = basePaths,
+ timeZoneId = timeZoneId)
// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
- // TODO: use correct timezone for partition values.
Cast(
Literal.create(row.getUTF8String(i), StringType),
userProvidedSchema.fields(i).dataType,
- Option(DateTimeUtils.defaultTimeZone().getID)).eval()
+ Option(timeZoneId)).eval()
}: _*)
}
@@ -151,7 +156,8 @@ abstract class PartitioningAwareFileIndex(
PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
- basePaths = basePaths)
+ basePaths = basePaths,
+ timeZoneId = timeZoneId)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index bad59961ac..09876bbc2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.lang.{Double => JDouble, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date => JDate, Timestamp => JTimestamp}
+import java.util.TimeZone
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
@@ -31,7 +31,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
@@ -91,10 +93,19 @@ object PartitioningUtils {
private[datasources] def parsePartitions(
paths: Seq[Path],
typeInference: Boolean,
- basePaths: Set[Path]): PartitionSpec = {
+ basePaths: Set[Path],
+ timeZoneId: String): PartitionSpec = {
+ parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId))
+ }
+
+ private[datasources] def parsePartitions(
+ paths: Seq[Path],
+ typeInference: Boolean,
+ basePaths: Set[Path],
+ timeZone: TimeZone): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
- parsePartition(path, typeInference, basePaths)
+ parsePartition(path, typeInference, basePaths, timeZone)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -173,7 +184,8 @@ object PartitioningUtils {
private[datasources] def parsePartition(
path: Path,
typeInference: Boolean,
- basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
+ basePaths: Set[Path],
+ timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
@@ -194,7 +206,7 @@ object PartitioningUtils {
// Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
- parsePartitionColumn(currentPath.getName, typeInference)
+ parsePartitionColumn(currentPath.getName, typeInference, timeZone)
maybeColumn.foreach(columns += _)
// Now, we determine if we should stop.
@@ -226,7 +238,8 @@ object PartitioningUtils {
private def parsePartitionColumn(
columnSpec: String,
- typeInference: Boolean): Option[(String, Literal)] = {
+ typeInference: Boolean,
+ timeZone: TimeZone): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
@@ -237,7 +250,7 @@ object PartitioningUtils {
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
- val literal = inferPartitionColumnValue(rawColumnValue, typeInference)
+ val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone)
Some(columnName -> literal)
}
}
@@ -370,7 +383,8 @@ object PartitioningUtils {
*/
private[datasources] def inferPartitionColumnValue(
raw: String,
- typeInference: Boolean): Literal = {
+ typeInference: Boolean,
+ timeZone: TimeZone): Literal = {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
@@ -390,8 +404,16 @@ object PartitioningUtils {
// Then falls back to fractional types
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
// Then falls back to date/timestamp types
- .orElse(Try(Literal(JDate.valueOf(raw))))
- .orElse(Try(Literal(JTimestamp.valueOf(unescapePathName(raw)))))
+ .orElse(Try(
+ Literal.create(
+ DateTimeUtils.getThreadLocalTimestampFormat(timeZone)
+ .parse(unescapePathName(raw)).getTime * 1000L,
+ TimestampType)))
+ .orElse(Try(
+ Literal.create(
+ DateTimeUtils.millisToDays(
+ DateTimeUtils.getThreadLocalDateFormat.parse(raw).getTime),
+ DateType)))
// Then falls back to string
.getOrElse {
if (raw == DEFAULT_PARTITION_NAME) {