aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2017-02-15 13:26:34 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-15 13:26:34 -0800
commit865b2fd84c6f82de147540c8f17bbe0f0d9fb69c (patch)
tree554bb50328d64206b8ab36eb8d3af1be5e1c7fd1 /sql
parent6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f (diff)
downloadspark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.tar.gz
spark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.tar.bz2
spark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.zip
[SPARK-18937][SQL] Timezone support in CSV/JSON parsing
## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #16750 from ueshin/issues/SPARK-18937.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala30
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala113
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala44
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala6
18 files changed, 312 insertions, 99 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c410e7919a..bd852a50fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
/**
* Converts an json input string to a [[StructType]] with the specified schema.
*/
-case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
- extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class JsonToStruct(
+ schema: StructType,
+ options: Map[String, String],
+ child: Expression,
+ timeZoneId: Option[String] = None)
+ extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
+ def this(schema: StructType, options: Map[String, String], child: Expression) =
+ this(schema, options, child, None)
+
@transient
lazy val parser =
new JacksonParser(
schema,
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
- new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+ new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
override def dataType: DataType = schema
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
override def nullSafeEval(json: Any): Any = {
try parser.parse(json.toString).headOption.orNull catch {
case _: SparkSQLJsonProcessingException => null
@@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
/**
* Converts a [[StructType]] to a json output string.
*/
-case class StructToJson(options: Map[String, String], child: Expression)
- extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+case class StructToJson(
+ options: Map[String, String],
+ child: Expression,
+ timeZoneId: Option[String] = None)
+ extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
+ def this(options: Map[String, String], child: Expression) = this(options, child, None)
+
@transient
lazy val writer = new CharArrayWriter()
@@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression)
new JacksonGenerator(
child.dataType.asInstanceOf[StructType],
writer,
- new JSONOptions(options))
+ new JSONOptions(options, timeZoneId.get))
override def dataType: DataType = StringType
@@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression)
}
}
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
override def nullSafeEval(row: Any): Any = {
gen.write(row.asInstanceOf[InternalRow])
gen.flush()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02bd8dede4..5307ce1cb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.json
-import java.util.Locale
+import java.util.{Locale, TimeZone}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
@@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
- @transient private val parameters: CaseInsensitiveMap[String])
+ @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+ this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -58,13 +59,15 @@ private[sql] class JSONOptions(
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+ val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index bf8e3c812e..dec55279c9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types._
private[sql] class JacksonGenerator(
schema: StructType,
writer: Writer,
- options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+ options: JSONOptions) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
// JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 8e20bd1d97..0c46819cdb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.catalyst.expressions
+import java.util.Calendar
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -305,51 +307,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("json_tuple - hive key 4 - null json") {
checkJsonTuple(
JsonTuple(Literal(null) :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - hive key 5 - null and empty fields") {
checkJsonTuple(
JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null)))
+ InternalRow(UTF8String.fromString(""), null, null, null, null))
}
test("json_tuple - hive key 6 - invalid json (array)") {
checkJsonTuple(
JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (object start only)") {
checkJsonTuple(
JsonTuple(Literal("{") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (no object end)") {
checkJsonTuple(
JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - invalid json (invalid json)") {
checkJsonTuple(
JsonTuple(Literal("\\") :: jsonTupleQuery),
- InternalRow.fromSeq(Seq(null, null, null, null, null)))
+ InternalRow(null, null, null, null, null))
}
test("json_tuple - preserve newlines") {
checkJsonTuple(
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
- InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
+ InternalRow(UTF8String.fromString("b\nc")))
}
+ val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
+
test("from_json") {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData)),
- InternalRow.fromSeq(1 :: Nil)
+ JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+ InternalRow(1)
)
}
@@ -357,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
null
)
// Other modes should still return `null`.
checkEvaluation(
- JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+ JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
null
)
}
@@ -371,15 +375,58 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
+ JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
null
)
}
+ test("from_json with timestamp") {
+ val schema = StructType(StructField("t", TimestampType) :: Nil)
+
+ val jsonData1 = """{"t": "2016-01-01T00:00:00.123Z"}"""
+ var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 123)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ // The result doesn't change because the json string includes timezone string ("Z" here),
+ // which means the string represents the timestamp string in the timezone regardless of
+ // the timeZoneId parameter.
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+
+ val jsonData2 = """{"t": "2016-01-01T00:00:00"}"""
+ for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+ c = Calendar.getInstance(tz)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 0)
+ checkEvaluation(
+ JsonToStruct(
+ schema,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
+ Literal(jsonData2),
+ Option(tz.getID)),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ checkEvaluation(
+ JsonToStruct(
+ schema,
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID),
+ Literal(jsonData2),
+ gmtId),
+ InternalRow(c.getTimeInMillis * 1000L)
+ )
+ }
+ }
+
test("SPARK-19543: from_json empty input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal.create(" ", StringType)),
+ JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
null
)
}
@@ -388,7 +435,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
- StructToJson(Map.empty, struct),
+ StructToJson(Map.empty, struct, gmtId),
"""{"a":1}"""
)
}
@@ -397,8 +444,40 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(null, schema)
checkEvaluation(
- StructToJson(Map.empty, struct),
+ StructToJson(Map.empty, struct, gmtId),
null
)
}
+
+ test("to_json with timestamp") {
+ val schema = StructType(StructField("t", TimestampType) :: Nil)
+ val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT)
+ c.set(2016, 0, 1, 0, 0, 0)
+ c.set(Calendar.MILLISECOND, 0)
+ val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
+
+ checkEvaluation(
+ StructToJson(Map.empty, struct, gmtId),
+ """{"t":"2016-01-01T00:00:00.000Z"}"""
+ )
+ checkEvaluation(
+ StructToJson(Map.empty, struct, Option("PST")),
+ """{"t":"2015-12-31T16:00:00.000-08:00"}"""
+ )
+
+ checkEvaluation(
+ StructToJson(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get),
+ struct,
+ gmtId),
+ """{"t":"2016-01-01T00:00:00"}"""
+ )
+ checkEvaluation(
+ StructToJson(
+ Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"),
+ struct,
+ gmtId),
+ """{"t":"2015-12-31T16:00:00"}"""
+ )
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1830839aee..780fe51ac6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* </ul>
*
* @since 2.0.0
@@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def json(jsonRDD: RDD[String]): DataFrame = {
- val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 748ebba3e8..1d834b1821 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.</li>
* </ul>
*
* @since 1.4.0
@@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.</li>
* </ul>
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b80ff48bb..e62cd9f7bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.json.JacksonGenerator
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans._
@@ -2678,10 +2678,12 @@ class Dataset[T] private[sql](
*/
def toJSON: Dataset[String] = {
val rowSchema = this.schema
+ val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- val gen = new JacksonGenerator(rowSchema, writer)
+ val gen = new JacksonGenerator(rowSchema, writer,
+ new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1d2bf07047..566f40f454 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.sources._
@@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val paths = files.map(_.getPath.toString)
val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
dataSchema: StructType): OutputWriterFactory = {
CSVUtils.verifySchema(dataSchema)
val conf = job.getConfiguration
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
csvOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9d79ea6ed1..b7fbaa4f44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
-import java.util.Locale
+import java.util.{Locale, TimeZone}
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat
@@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
-private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[csv] class CSVOptions(
+ @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+ this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
@@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
name.map(CompressionCodecs.getCodecClassName)
}
+ val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
val maxColumns = getInt("maxColumns", 20480)
@@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
settings
}
}
-
-object CSVOptions {
-
- def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty))
-
- def apply(paramName: String, paramValue: String): CSVOptions = {
- new CSVOptions(Map(paramName -> paramValue))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index ee79138c0f..4082a0df8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
private[csv] class UnivocityGenerator(
schema: StructType,
writer: Writer,
- options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
+ options: CSVOptions) {
private val writerSettings = options.asWriterSettings
writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3b42aa60b0..2e409b3f5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -76,7 +76,7 @@ private[csv] class UnivocityParser(
name: String,
dataType: DataType,
nullable: Boolean = true,
- options: CSVOptions = CSVOptions()): ValueConverter = dataType match {
+ options: CSVOptions): ValueConverter = dataType match {
case _: ByteType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(_.toByte)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 98ab9d2850..b4a8ff2cf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
if (files.isEmpty) {
None
} else {
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
parsedOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b7ffb3cddb..4e706da184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* </ul>
*
* @since 2.0.0
@@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index d8c6c25504..6617420871 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
class CSVInferSchemaSuite extends SparkFunSuite {
test("String fields types are inferred correctly from null types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(NullType, "", options) == NullType)
assert(CSVInferSchema.inferField(NullType, null, options) == NullType)
assert(CSVInferSchema.inferField(NullType, "100000000000", options) == LongType)
@@ -41,7 +41,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("String fields types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(LongType, "1.0", options) == DoubleType)
assert(CSVInferSchema.inferField(LongType, "test", options) == StringType)
assert(CSVInferSchema.inferField(IntegerType, "1.0", options) == DoubleType)
@@ -60,21 +60,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Timestamp field types are inferred correctly via custom data format") {
- var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
+ var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
- options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
+ options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}
test("Timestamp field types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
assert(CSVInferSchema.inferField(DoubleType, "2015-08-20 14:10", options) == StringType)
assert(CSVInferSchema.inferField(LongType, "2015-08 14:49:00", options) == StringType)
}
test("Boolean fields types are inferred correctly from other types") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(LongType, "Fale", options) == StringType)
assert(CSVInferSchema.inferField(DoubleType, "TRUEe", options) == StringType)
}
@@ -92,12 +92,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Null fields are handled properly when a nullValue is specified") {
- var options = new CSVOptions(Map("nullValue" -> "null"))
+ var options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
assert(CSVInferSchema.inferField(NullType, "null", options) == NullType)
assert(CSVInferSchema.inferField(StringType, "null", options) == StringType)
assert(CSVInferSchema.inferField(LongType, "null", options) == LongType)
- options = new CSVOptions(Map("nullValue" -> "\\N"))
+ options = new CSVOptions(Map("nullValue" -> "\\N"), "GMT")
assert(CSVInferSchema.inferField(IntegerType, "\\N", options) == IntegerType)
assert(CSVInferSchema.inferField(DoubleType, "\\N", options) == DoubleType)
assert(CSVInferSchema.inferField(TimestampType, "\\N", options) == TimestampType)
@@ -111,12 +111,12 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+ val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
}
test("SPARK-18877: `inferField` on DecimalType should find a common type with `typeSoFar`") {
- val options = new CSVOptions(Map.empty[String, String])
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
// 9.03E+12 is Decimal(3, -10) and 1.19E+11 is Decimal(3, -9).
assert(CSVInferSchema.inferField(DecimalType(3, -10), "1.19E+11", options) ==
@@ -134,7 +134,7 @@ class CSVInferSchemaSuite extends SparkFunSuite {
test("DoubleType should be infered when user defined nan/inf are provided") {
val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf",
- "positiveInf" -> "inf"))
+ "positiveInf" -> "inf"), "GMT")
assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)
assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType)
assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index df9cebbe58..0c9a7298c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -839,7 +839,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
- test("Write timestamps correctly with dateFormat option") {
+ test("Write timestamps correctly with timestampFormat option") {
withTempDir { dir =>
// With dateFormat option.
val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
@@ -870,6 +870,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
+ test("Write timestamps correctly with timestampFormat option and timeZone option") {
+ withTempDir { dir =>
+ // With dateFormat option and timeZone option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+ val timestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestampsWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringTimestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(timestampsWithFormatPath)
+ val expectedStringTimestampsWithFormat = Seq(
+ Row("2015/08/27 01:00"),
+ Row("2014/10/28 01:30"),
+ Row("2016/01/29 04:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+
+ val readBack = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .load(timestampsWithFormatPath)
+
+ checkAnswer(readBack, timestampsWithFormat)
+ }
+ }
+
test("load duplicated field names consistently with null or empty strings - case sensitive") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
withTempPath { path =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
index 62dae08861..a74b22a4a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String
class UnivocityParserSuite extends SparkFunSuite {
private val parser =
- new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String]))
+ new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT"))
private def assertNull(v: Any) = assert(v == null)
@@ -38,7 +38,8 @@ class UnivocityParserSuite extends SparkFunSuite {
stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) =>
val decimalValue = new BigDecimal(decimalVal.toString)
- assert(parser.makeConverter("_1", decimalType).apply(strVal) ===
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) ===
Decimal(decimalValue, decimalType.precision, decimalType.scale))
}
}
@@ -50,20 +51,23 @@ class UnivocityParserSuite extends SparkFunSuite {
// Nullable field with nullValue option.
types.foreach { t =>
// Tests that a custom nullValue.
+ val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), "GMT")
val converter =
- parser.makeConverter("_1", t, nullable = true, CSVOptions("nullValue", "-"))
+ parser.makeConverter("_1", t, nullable = true, options = nullValueOptions)
assertNull(converter.apply("-"))
assertNull(converter.apply(null))
// Tests that the default nullValue is empty string.
- assertNull(parser.makeConverter("_1", t, nullable = true).apply(""))
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply(""))
}
// Not nullable field with nullValue option.
types.foreach { t =>
// Casts a null to not nullable field should throw an exception.
+ val options = new CSVOptions(Map("nullValue" -> "-"), "GMT")
val converter =
- parser.makeConverter("_1", t, nullable = false, CSVOptions("nullValue", "-"))
+ parser.makeConverter("_1", t, nullable = false, options = options)
var message = intercept[RuntimeException] {
converter.apply("-")
}.getMessage
@@ -77,48 +81,52 @@ class UnivocityParserSuite extends SparkFunSuite {
// If nullValue is different with empty string, then, empty string should not be casted into
// null.
Seq(true, false).foreach { b =>
+ val options = new CSVOptions(Map("nullValue" -> "null"), "GMT")
val converter =
- parser.makeConverter("_1", StringType, nullable = b, CSVOptions("nullValue", "null"))
+ parser.makeConverter("_1", StringType, nullable = b, options = options)
assert(converter.apply("") == UTF8String.fromString(""))
}
}
test("Throws exception for empty string with non null type") {
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
val exception = intercept[RuntimeException]{
- parser.makeConverter("_1", IntegerType, nullable = false, CSVOptions()).apply("")
+ parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
}
test("Types are cast correctly") {
- assert(parser.makeConverter("_1", ByteType).apply("10") == 10)
- assert(parser.makeConverter("_1", ShortType).apply("10") == 10)
- assert(parser.makeConverter("_1", IntegerType).apply("10") == 10)
- assert(parser.makeConverter("_1", LongType).apply("10") == 10)
- assert(parser.makeConverter("_1", FloatType).apply("1.00") == 1.0)
- assert(parser.makeConverter("_1", DoubleType).apply("1.00") == 1.0)
- assert(parser.makeConverter("_1", BooleanType).apply("true") == true)
-
- val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", LongType, options = options).apply("10") == 10)
+ assert(parser.makeConverter("_1", FloatType, options = options).apply("1.00") == 1.0)
+ assert(parser.makeConverter("_1", DoubleType, options = options).apply("1.00") == 1.0)
+ assert(parser.makeConverter("_1", BooleanType, options = options).apply("true") == true)
+
+ val timestampsOptions =
+ new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), "GMT")
val customTimestamp = "31/01/2015 00:00"
val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
val castedTimestamp =
- parser.makeConverter("_1", TimestampType, nullable = true, timestampsOptions)
+ parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)
val customDate = "31/01/2015"
- val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+ val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), "GMT")
val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
val castedDate =
- parser.makeConverter("_1", DateType, nullable = true, dateOptions)
+ parser.makeConverter("_1", DateType, nullable = true, options = dateOptions)
.apply(customTimestamp)
assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
val timestamp = "2015-01-01 00:00:00"
- assert(parser.makeConverter("_1", TimestampType).apply(timestamp) ==
+ assert(parser.makeConverter("_1", TimestampType, options = options).apply(timestamp) ==
DateTimeUtils.stringToTime(timestamp).getTime * 1000L)
- assert(parser.makeConverter("_1", DateType).apply("2015-01-01") ==
+ assert(parser.makeConverter("_1", DateType, options = options).apply("2015-01-01") ==
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime("2015-01-01").getTime))
}
@@ -127,16 +135,18 @@ class UnivocityParserSuite extends SparkFunSuite {
try {
Locale.setDefault(new Locale("fr", "FR"))
// Would parse as 1.0 in fr-FR
- assert(parser.makeConverter("_1", FloatType).apply("1,00") == 100.0)
- assert(parser.makeConverter("_1", DoubleType).apply("1,00") == 100.0)
+ val options = new CSVOptions(Map.empty[String, String], "GMT")
+ assert(parser.makeConverter("_1", FloatType, options = options).apply("1,00") == 100.0)
+ assert(parser.makeConverter("_1", DoubleType, options = options).apply("1,00") == 100.0)
} finally {
Locale.setDefault(originalLocale)
}
}
test("Float NaN values are parsed correctly") {
+ val options = new CSVOptions(Map("nanValue" -> "nn"), "GMT")
val floatVal: Float = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("nanValue", "nn")
+ "_1", FloatType, nullable = true, options = options
).apply("nn").asInstanceOf[Float]
// Java implements the IEEE-754 floating point standard which guarantees that any comparison
@@ -145,36 +155,41 @@ class UnivocityParserSuite extends SparkFunSuite {
}
test("Double NaN values are parsed correctly") {
+ val options = new CSVOptions(Map("nanValue" -> "-"), "GMT")
val doubleVal: Double = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("nanValue", "-")
+ "_1", DoubleType, nullable = true, options = options
).apply("-").asInstanceOf[Double]
assert(doubleVal.isNaN)
}
test("Float infinite values can be parsed") {
+ val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
val floatVal1 = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("negativeInf", "max")
+ "_1", FloatType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Float]
assert(floatVal1 == Float.NegativeInfinity)
+ val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
val floatVal2 = parser.makeConverter(
- "_1", FloatType, nullable = true, CSVOptions("positiveInf", "max")
+ "_1", FloatType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Float]
assert(floatVal2 == Float.PositiveInfinity)
}
test("Double infinite values can be parsed") {
+ val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), "GMT")
val doubleVal1 = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("negativeInf", "max")
+ "_1", DoubleType, nullable = true, options = negativeInfOptions
).apply("max").asInstanceOf[Double]
assert(doubleVal1 == Double.NegativeInfinity)
+ val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), "GMT")
val doubleVal2 = parser.makeConverter(
- "_1", DoubleType, nullable = true, CSVOptions("positiveInf", "max")
+ "_1", DoubleType, nullable = true, options = positiveInfOptions
).apply("max").asInstanceOf[Double]
assert(doubleVal2 == Double.PositiveInfinity)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 156fd965b4..9344aeda00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -62,7 +62,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}
- val dummyOption = new JSONOptions(Map.empty[String, String])
+ val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, "", dummyOption)
@@ -1366,7 +1366,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
- val emptySchema = JsonInferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String]))
+ val emptySchema = JsonInferSchema.infer(
+ empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
assert(StructType(Seq()) === emptySchema)
}
@@ -1391,7 +1392,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
- emptyRecords, "", new JSONOptions(Map.empty[String, String]))
+ emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
assert(StructType(Seq()) === emptySchema)
}
@@ -1723,7 +1724,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
- test("Write timestamps correctly with dateFormat option") {
+ test("Write timestamps correctly with timestampFormat option") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
// With dateFormat option.
@@ -1751,6 +1752,43 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("Write timestamps correctly with timestampFormat option and timeZone option") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // With dateFormat option and timeZone option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+ val timestampsWithFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+ timestampsWithFormat.write
+ .format("json")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/27 01:00"),
+ Row("2014/10/28 01:30"),
+ Row("2016/01/29 04:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+
+ val readBack = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .option("timeZone", "GMT")
+ .json(timestampsWithFormatPath)
+
+ checkAnswer(readBack, timestampsWithFormat)
+ }
+ }
+
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val records = sparkContext
.parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 76ffb949f1..9b5e364e51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.sources
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
private def getProvidingClass(name: String): Class[_] =
- DataSource(sparkSession = null, className = name).providingClass
+ DataSource(
+ sparkSession = null,
+ className = name,
+ options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass
test("jdbc") {
assert(