aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2017-03-03 16:35:54 -0800
committerWenchen Fan <wenchen@databricks.com>2017-03-03 16:35:54 -0800
commit2a7921a813ecd847fd933ffef10edc64684e9df7 (patch)
treee13297965da232e7e60c4ebc2f1e9ce050f11851 /sql/catalyst
parentba186a841fcfcd73a1530ca2418cc08bb0df92e1 (diff)
downloadspark-2a7921a813ecd847fd933ffef10edc64684e9df7.tar.gz
spark-2a7921a813ecd847fd933ffef10edc64684e9df7.tar.bz2
spark-2a7921a813ecd847fd933ffef10edc64684e9df7.zip
[SPARK-18939][SQL] Timezone support in partition values.
## What changes were proposed in this pull request? This is a follow-up pr of #16308 and #16750. This pr enables timezone support in partition values. We should use `timeZone` option introduced at #16750 to parse/format partition values of the `TimestampType`. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will be used for partition values, the values written by the default timezone option, which is `"GMT"` because the session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq((1, new java.sql.Timestamp(1451606400000L))).toDF("i", "ts") df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp] scala> df.show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ scala> df.write.partitionBy("ts").save("/path/to/gmtpartition") ``` ```sh $ ls /path/to/gmtpartition/ _SUCCESS ts=2016-01-01 00%3A00%3A00 ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").partitionBy("ts").save("/path/to/pstpartition") ``` ```sh $ ls /path/to/pstpartition/ _SUCCESS ts=2015-12-31 16%3A00%3A00 ``` We can properly read the partition values if the session local timezone and the timezone of the partition values are the same: ```scala scala> spark.read.load("/path/to/gmtpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` And even if the timezones are different, we can properly read the values with setting corrent timezone option: ```scala // wrong result scala> spark.read.load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2015-12-31 16:00:00| +---+-------------------+ // correct result scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show() +---+-------------------+ | i| ts| +---+-------------------+ | 1|2016-01-01 00:00:00| +---+-------------------+ ``` ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #17053 from ueshin/issues/SPARK-18939.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala10
4 files changed, 11 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index a3a4ab37ea..31eded4deb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -244,11 +244,13 @@ abstract class ExternalCatalog {
* @param db database name
* @param table table name
* @param predicates partition-pruning predicates
+ * @param defaultTimeZoneId default timezone id to parse partition values of TimestampType
*/
def listPartitionsByFilter(
db: String,
table: String,
- predicates: Seq[Expression]): Seq[CatalogTablePartition]
+ predicates: Seq[Expression],
+ defaultTimeZoneId: String): Seq[CatalogTablePartition]
// --------------------------------------------------------------------------
// Functions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 6bb2b2d4ff..340e8451f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -544,7 +544,8 @@ class InMemoryCatalog(
override def listPartitionsByFilter(
db: String,
table: String,
- predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+ predicates: Seq[Expression],
+ defaultTimeZoneId: String): Seq[CatalogTablePartition] = {
// TODO: Provide an implementation
throw new UnsupportedOperationException(
"listPartitionsByFilter is not implemented for InMemoryCatalog")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 06734891b6..f6412e42c1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -841,7 +841,7 @@ class SessionCatalog(
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.listPartitionsByFilter(db, table, predicates)
+ externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index cb939026f1..887caf07d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, Internal
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.StructType
@@ -113,11 +113,11 @@ case class CatalogTablePartition(
/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
- def toRow(partitionSchema: StructType): InternalRow = {
+ def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
+ val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
+ val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId)
InternalRow.fromSeq(partitionSchema.map { field =>
- // TODO: use correct timezone for partition values.
- Cast(Literal(spec(field.name)), field.dataType,
- Option(DateTimeUtils.defaultTimeZone().getID)).eval()
+ Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
})
}
}