From 865b2fd84c6f82de147540c8f17bbe0f0d9fb69c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 15 Feb 2017 13:26:34 -0800 Subject: [SPARK-18937][SQL] Timezone support in CSV/JSON parsing ## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN Closes #16750 from ueshin/issues/SPARK-18937. --- .../sql/catalyst/expressions/jsonExpressions.scala | 30 ++++-- .../spark/sql/catalyst/json/JSONOptions.scala | 11 +- .../spark/sql/catalyst/json/JacksonGenerator.scala | 2 +- .../expressions/JsonExpressionsSuite.scala | 113 +++++++++++++++++---- .../org/apache/spark/sql/DataFrameReader.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 4 + .../main/scala/org/apache/spark/sql/Dataset.scala | 6 +- .../execution/datasources/csv/CSVFileFormat.scala | 8 +- .../sql/execution/datasources/csv/CSVOptions.scala | 21 ++-- .../datasources/csv/UnivocityGenerator.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 2 +- .../datasources/json/JsonFileFormat.scala | 9 +- .../spark/sql/streaming/DataStreamReader.scala | 4 + .../datasources/csv/CSVInferSchemaSuite.scala | 22 ++-- .../sql/execution/datasources/csv/CSVSuite.scala | 44 +++++++- .../datasources/csv/UnivocityParserSuite.scala | 73 +++++++------ .../sql/execution/datasources/json/JsonSuite.scala | 46 ++++++++- .../sql/sources/ResolvedDataSourceSuite.scala | 6 +- 18 files changed, 312 insertions(+), 99 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index c410e7919a..bd852a50fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] with the specified schema. */ -case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class JsonToStruct( + schema: StructType, + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(schema: StructType, options: Map[String, String], child: Expression) = + this(schema, options, child, None) + @transient lazy val parser = new JacksonParser( schema, "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`. - new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE))) + new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) override def dataType: DataType = schema + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(json: Any): Any = { try parser.parse(json.toString).headOption.orNull catch { case _: SparkSQLJsonProcessingException => null @@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: /** * Converts a [[StructType]] to a json output string. */ -case class StructToJson(options: Map[String, String], child: Expression) - extends UnaryExpression with CodegenFallback with ExpectsInputTypes { +case class StructToJson( + options: Map[String, String], + child: Expression, + timeZoneId: Option[String] = None) + extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes { override def nullable: Boolean = true + def this(options: Map[String, String], child: Expression) = this(options, child, None) + @transient lazy val writer = new CharArrayWriter() @@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression) new JacksonGenerator( child.dataType.asInstanceOf[StructType], writer, - new JSONOptions(options)) + new JSONOptions(options, timeZoneId.get)) override def dataType: DataType = StringType @@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression) } } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) + override def nullSafeEval(row: Any): Any = { gen.write(row.asInstanceOf[InternalRow]) gen.flush() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 02bd8dede4..5307ce1cb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.util.Locale +import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap[String]) + @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(CaseInsensitiveMap(parameters), defaultTimeZoneId) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) @@ -58,13 +59,15 @@ private[sql] class JSONOptions( private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index bf8e3c812e..dec55279c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.types._ private[sql] class JacksonGenerator( schema: StructType, writer: Writer, - options: JSONOptions = new JSONOptions(Map.empty[String, String])) { + options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 8e20bd1d97..0c46819cdb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.Calendar + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.ParseModes -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("json_tuple - hive key 4 - null json") { checkJsonTuple( JsonTuple(Literal(null) :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - hive key 5 - null and empty fields") { checkJsonTuple( JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null))) + InternalRow(UTF8String.fromString(""), null, null, null, null)) } test("json_tuple - hive key 6 - invalid json (array)") { checkJsonTuple( JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (object start only)") { checkJsonTuple( JsonTuple(Literal("{") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (no object end)") { checkJsonTuple( JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - invalid json (invalid json)") { checkJsonTuple( JsonTuple(Literal("\\") :: jsonTupleQuery), - InternalRow.fromSeq(Seq(null, null, null, null, null))) + InternalRow(null, null, null, null, null)) } test("json_tuple - preserve newlines") { checkJsonTuple( JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil), - InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc")))) + InternalRow(UTF8String.fromString("b\nc"))) } + val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID) + test("from_json") { val jsonData = """{"a": 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), - InternalRow.fromSeq(1 :: Nil) + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), + InternalRow(1) ) } @@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val jsonData = """{"a" 1}""" val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal(jsonData)), + JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId), null ) // Other modes should still return `null`. checkEvaluation( - JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)), + JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), null ) } @@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("from_json null input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(null, StringType)), + JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId), null ) } + test("from_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + + val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}""" + var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 123) + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + // The result doesn't change because the json string includes timezone string ("Z" here), + // which means the string represents the timestamp string in the timezone regardless of + // the timeZoneId parameter. + checkEvaluation( + JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")), + InternalRow(c.getTimeInMillis * 1000L) + ) + + val jsonData2 = """{"t": "2016-01-01T00:00:00"}""" + for (tz <- DateTimeTestUtils.ALL_TIMEZONES) { + c = Calendar.getInstance(tz) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), + Literal(jsonData2), + Option(tz.getID)), + InternalRow(c.getTimeInMillis * 1000L) + ) + checkEvaluation( + JsonToStruct( + schema, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), + Literal(jsonData2), + gmtId), + InternalRow(c.getTimeInMillis * 1000L) + ) + } + } + test("SPARK-19543: from_json empty input column") { val schema = StructType(StructField("a", IntegerType) :: Nil) checkEvaluation( - JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)), + JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId), null ) } @@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(create_row(1), schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), """{"a":1}""" ) } @@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val schema = StructType(StructField("a", IntegerType) :: Nil) val struct = Literal.create(null, schema) checkEvaluation( - StructToJson(Map.empty, struct), + StructToJson(Map.empty, struct, gmtId), null ) } + + test("to_json with timestamp") { + val schema = StructType(StructField("t", TimestampType) :: Nil) + val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + c.set(2016, 0, 1, 0, 0, 0) + c.set(Calendar.MILLISECOND, 0) + val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) + + checkEvaluation( + StructToJson(Map.empty, struct, gmtId), + """{"t":"2016-01-01T00:00:00.000Z"}""" + ) + checkEvaluation( + StructToJson(Map.empty, struct, Option("PST")), + """{"t":"2015-12-31T16:00:00.000-08:00"}""" + ) + + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get), + struct, + gmtId), + """{"t":"2016-01-01T00:00:00"}""" + ) + checkEvaluation( + StructToJson( + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"), + struct, + gmtId), + """{"t":"2015-12-31T16:00:00"}""" + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1830839aee..780fe51ac6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,6 +27,7 @@ import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 @@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val parsedOptions: JSONOptions = + new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 748ebba3e8..1d834b1821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 1.4.0 @@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps.
  • * * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6b80ff48bb..e62cd9f7bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.json.JacksonGenerator +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans._ @@ -2678,10 +2678,12 @@ class Dataset[T] private[sql]( */ def toJSON: Dataset[String] = { val rowSchema = this.schema + val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter => val writer = new CharArrayWriter() // create the Generator without separator inserted between 2 records - val gen = new JacksonGenerator(rowSchema, writer) + val gen = new JacksonGenerator(rowSchema, writer, + new JSONOptions(Map.empty[String, String], sessionLocalTimeZone)) new Iterator[String] { override def hasNext: Boolean = iter.hasNext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 1d2bf07047..566f40f454 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -29,7 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.sources._ @@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val paths = files.map(_.getPath.toString) val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths) val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis @@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { dataSchema: StructType): OutputWriterFactory = { CSVUtils.verifySchema(dataSchema) val conf = job.getConfiguration - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) csvOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options) + val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 9d79ea6ed1..b7fbaa4f44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets -import java.util.Locale +import java.util.{Locale, TimeZone} import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apache.commons.lang3.time.FastDateFormat @@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String]) +private[csv] class CSVOptions( + @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String], defaultTimeZoneId: String) = + this(CaseInsensitiveMap(parameters), defaultTimeZoneId) private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive name.map(CompressionCodecs.getCodecClassName) } + val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US) val timestampFormat: FastDateFormat = FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US) + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US) val maxColumns = getInt("maxColumns", 20480) @@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive settings } } - -object CSVOptions { - - def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty)) - - def apply(paramName: String, paramValue: String): CSVOptions = { - new CSVOptions(Map(paramName -> paramValue)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index ee79138c0f..4082a0df8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( schema: StructType, writer: Writer, - options: CSVOptions = new CSVOptions(Map.empty[String, String])) { + options: CSVOptions) { private val writerSettings = options.asWriterSettings writerSettings.setHeaders(schema.fieldNames: _*) private val gen = new CsvWriter(writer, writerSettings) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3b42aa60b0..2e409b3f5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -76,7 +76,7 @@ private[csv] class UnivocityParser( name: String, dataType: DataType, nullable: Boolean = true, - options: CSVOptions = CSVOptions()): ValueConverter = dataType match { + options: CSVOptions): ValueConverter = dataType match { case _: ByteType => (d: String) => nullSafeDatum(d, name, nullable, options)(_.toByte) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 98ab9d2850..b4a8ff2cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { if (files.isEmpty) { None } else { - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) @@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) parsedOptions.compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - val parsedOptions: JSONOptions = new JSONOptions(options) + val parsedOptions: JSONOptions = + new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index b7ffb3cddb..4e706da184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • * * * @since 2.0.0 @@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.
  • + *
  • `timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps.
  • *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index d8c6c25504..6617420871 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { test("String fields types are inferred correctly from null types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(NullType, "", options) == NullType) assert(CSVInferSchema.inferField(NullType, null, options) == NullType) assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType) @@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("String fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType) assert(CSVInferSchema.inferField(LongType, "test", options) == StringType) assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType) @@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Timestamp field types are inferred correctly via custom data format") { - var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm")) + var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) - options = new CSVOptions(Map("timestampFormat" -> "yyyy")) + options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType) } test("Timestamp field types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType) assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType) } test("Boolean fields types are inferred correctly from other types") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType) assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType) } @@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("Null fields are handled properly when a nullValue is specified") { - var options = new CSVOptions(Map("nullValue" -> "null")) + var options = new CSVOptions(Map("nullValue" -> "null"), "GMT") assert(CSVInferSchema.inferField(NullType, "null", options) == NullType) assert(CSVInferSchema.inferField(StringType, "null", options) == StringType) assert(CSVInferSchema.inferField(LongType, "null", options) == LongType) - options = new CSVOptions(Map("nullValue" -> "\\N")) + options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT") assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType) assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType) assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType) @@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm")) + val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT") assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType) } test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") { - val options = new CSVOptions(Map.empty[String, String]) + val options = new CSVOptions(Map.empty[String, String], "GMT") // 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9). assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) == @@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { test("DoubleType should be infered when user defined nan/inf are provided") { val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", - "positiveInf" -> "inf")) + "positiveInf" -> "inf"), "GMT") assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType) assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType) assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index df9cebbe58..0c9a7298c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { withTempDir { dir => // With dateFormat option. val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" @@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv" + val timestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .load(testFile(datesFile)) + timestampsWithFormat.write + .format("csv") + .option("header", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringTimestampsWithFormat = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "false") + .load(timestampsWithFormatPath) + val expectedStringTimestampsWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat) + + val readBack = spark.read + .format("csv") + .option("header", "true") + .option("inferSchema", "true") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .load(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("load duplicated field names consistently with null or empty strings - case sensitive") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { withTempPath { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 62dae08861..a74b22a4a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { private val parser = - new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String])) + new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT")) private def assertNull(v: Any) = assert(v == null) @@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) - assert(parser.makeConverter("_1", decimalType).apply(strVal) === + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite { // Nullable field with nullValue option. types.foreach { t => // Tests that a custom nullValue. + val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = - parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. - assertNull(parser.makeConverter("_1", t, nullable = true).apply("")) + val options = new CSVOptions(Map.empty[String, String], "GMT") + assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) } // Not nullable field with nullValue option. types.foreach { t => // Casts a null to not nullable field should throw an exception. + val options = new CSVOptions(Map("nullValue" -> "-"), "GMT") val converter = - parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-")) + parser.makeConverter("_1", t, nullable = false, options = options) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite { // If nullValue is different with empty string, then, empty string should not be casted into // null. Seq(true, false).foreach { b => + val options = new CSVOptions(Map("nullValue" -> "null"), "GMT") val converter = - parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null")) + parser.makeConverter("_1", StringType, nullable = b, options = options) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { + val options = new CSVOptions(Map.empty[String, String], "GMT") val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("") + parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { - assert(parser.makeConverter("_1", ByteType).apply("10") == 10) - assert(parser.makeConverter("_1", ShortType).apply("10") == 10) - assert(parser.makeConverter("_1", IntegerType).apply("10") == 10) - assert(parser.makeConverter("_1", LongType).apply("10") == 10) - assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0) - assert(parser.makeConverter("_1", BooleanType).apply("true") == true) - - val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm") + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10) + assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0) + assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true) + + val timestampsOptions = + new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT") val customTimestamp = "31/01/2015 00:00" val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions) + parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" - val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy") + val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT") val expectedDate = dateOptions.dateFormat.parse(customDate).getTime val castedDate = - parser.makeConverter("_1", DateType, nullable = true, dateOptions) + parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) .apply(customTimestamp) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" - assert(parser.makeConverter("_1", TimestampType).apply(timestamp) == + assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) == DateTimeUtils.stringToTime(timestamp).getTime * 1000L) - assert(parser.makeConverter("_1", DateType).apply("2015-01-01") == + assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") == DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime)) } @@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite { try { Locale.setDefault(new Locale("fr", "FR")) // Would parse as 1.0 in fr-FR - assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0) - assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0) + val options = new CSVOptions(Map.empty[String, String], "GMT") + assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0) + assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0) } finally { Locale.setDefault(originalLocale) } } test("Float NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT") val floatVal: Float = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn") + "_1", FloatType, nullable = true, options = options ).apply("nn").asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison @@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite { } test("Double NaN values are parsed correctly") { + val options = new CSVOptions(Map("nanValue" -> "-"), "GMT") val doubleVal: Double = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-") + "_1", DoubleType, nullable = true, options = options ).apply("-").asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val floatVal1 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max") + "_1", FloatType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val floatVal2 = parser.makeConverter( - "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max") + "_1", FloatType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { + val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT") val doubleVal1 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max") + "_1", DoubleType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) + val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT") val doubleVal2 = parser.makeConverter( - "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max") + "_1", DoubleType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 156fd965b4..9344aeda00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { generator.flush() } - val dummyOption = new JSONOptions(Map.empty[String, String]) + val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, "", dummyOption) @@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception - val emptySchema = JsonInferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String])) + val emptySchema = JsonInferSchema.infer( + empty, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } @@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = JsonInferSchema.infer( - emptyRecords, "", new JSONOptions(Map.empty[String, String])) + emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT")) assert(StructType(Seq()) === emptySchema) } @@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - test("Write timestamps correctly with dateFormat option") { + test("Write timestamps correctly with timestampFormat option") { val customSchema = new StructType(Array(StructField("date", TimestampType, true))) withTempDir { dir => // With dateFormat option. @@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Write timestamps correctly with timestampFormat option and timeZone option") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + // With dateFormat option and timeZone option. + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .schema(customSchema) + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .save(timestampsWithFormatPath) + + // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringTimestampsWithFormat = spark.read + .schema(stringSchema) + .json(timestampsWithFormatPath) + val expectedStringDatesWithFormat = Seq( + Row("2015/08/27 01:00"), + Row("2014/10/28 01:30"), + Row("2016/01/29 04:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat) + + val readBack = spark.read + .schema(customSchema) + .option("timestampFormat", "yyyy/MM/dd HH:mm") + .option("timeZone", "GMT") + .json(timestampsWithFormatPath) + + checkAnswer(readBack, timestampsWithFormat) + } + } + test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { val records = sparkContext .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 76ffb949f1..9b5e364e51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -19,11 +19,15 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { private def getProvidingClass(name: String): Class[_] = - DataSource(sparkSession = null, className = name).providingClass + DataSource( + sparkSession = null, + className = name, + options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass test("jdbc") { assert( -- cgit v1.2.3