From 865b2fd84c6f82de147540c8f17bbe0f0d9fb69c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 15 Feb 2017 13:26:34 -0800 Subject: [SPARK-18937][SQL] Timezone support in CSV/JSON parsing ## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN Closes #16750 from ueshin/issues/SPARK-18937. --- .../org/apache/spark/sql/DataFrameReader.scala | 8 +++++++- .../org/apache/spark/sql/DataFrameWriter.scala | 4 ++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 6 ++++-- .../execution/datasources/csv/CSVFileFormat.scala | 8 ++++---- .../sql/execution/datasources/csv/CSVOptions.scala | 21 ++++++++------------- .../datasources/csv/UnivocityGenerator.scala | 2 +- .../execution/datasources/csv/UnivocityParser.scala | 2 +- .../execution/datasources/json/JsonFileFormat.scala | 9 ++++++--- .../spark/sql/streaming/DataStreamReader.scala | 4 ++++ 9 files changed, 39 insertions(+), 25 deletions(-) (limited to 'sql/core/src/main/scala/org') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1830839aee..780fe51ac6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,6 +27,7 @@ import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 @@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val parsedOptions: JSONOptions = + new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 748ebba3e8..1d834b1821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 1.4.0 @@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6b80ff48bb..e62cd9f7bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.json.JacksonGenerator +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans._ @@ -2678,10 +2678,12 @@ class Dataset[T] private[sql]( */ def toJSON: Dataset[String] = { val rowSchema = this.schema + val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - val gen = new JacksonGenerator(rowSchema, writer) + val gen = new JacksonGenerator(rowSchema, writer, + new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 1d2bf07047..566f40f454 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -29,7 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.sources._ @@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val paths = files.map(_.getPath.toString) val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis @@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { dataSchema: StructType): OutputWriterFactory = { CSVUtils.verifySchema(dataSchema) val conf = job.getConfiguration - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) csvOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9d79ea6ed1..b7fbaa4f44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets -import java.util.Locale +import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat @@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String]) +private[csv] class CSVOptions( + @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(CaseInsensitiveMap(parameters), defaultTimeZoneId) private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive name.map(CompressionCodecs.getCodecClassName) } + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) val maxColumns = getInt("maxColumns", 20480) @@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive settings } } - -object CSVOptions { - - def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty)) - - def apply(paramName: String, paramValue: String): CSVOptions = { - new CSVOptions(Map(paramName -> paramValue)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index ee79138c0f..4082a0df8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( schema: StructType, writer: Writer, - options: CSVOptions = new CSVOptions(Map.empty[String, String])) { + options: CSVOptions) { private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3b42aa60b0..2e409b3f5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -76,7 +76,7 @@ private[csv] class UnivocityParser( name: String, dataType: DataType, nullable: Boolean = true, - options: CSVOptions = CSVOptions()): ValueConverter = dataType match { + options: CSVOptions): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 98ab9d2850..b4a8ff2cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index b7ffb3cddb..4e706da184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 @@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed -- cgit v1.2.3