aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-24 22:16:20 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-24 22:16:20 +0200
commit29952ed096fd2a0a19079933ff691671d6f00835 (patch)
tree88846c8853b674b098e0c85ee1c462a8b99fb93a
parent891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92 (diff)
downloadspark-29952ed096fd2a0a19079933ff691671d6f00835.tar.gz
spark-29952ed096fd2a0a19079933ff691671d6f00835.tar.bz2
spark-29952ed096fd2a0a19079933ff691671d6f00835.zip
[SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON
## What changes were proposed in this pull request? ### Default - ISO 8601 Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below: - CSV ``` // TimestampType 1414459800000000 // DateType 16673 ``` - Json ``` // TimestampType 1970-01-01 11:46:40.0 // DateType 1970-01-01 ``` So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed. So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)). - For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`) ``` 1970-01-01T02:00:01.000-01:00 ``` - For `Date` it becomes as below (`yyyy-MM-dd`) ``` 1970-01-01 ``` ### Custom date format option - `dateFormat` This PR also adds the support to write and read dates and timestamps in a formatted string as below: - **DateType** - With `dateFormat` option (e.g. `yyyy/MM/dd`) ``` +----------+ | date| +----------+ |2015/08/26| |2014/10/27| |2016/01/28| +----------+ ``` ### Custom date format option - `timestampFormat` - **TimestampType** - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`) ``` +----------------+ | date| +----------------+ |2015/08/26 18:00| |2014/10/27 18:30| |2016/01/28 20:00| +----------------+ ``` ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
-rw-r--r--python/pyspark/sql/readwriter.py56
-rw-r--r--python/pyspark/sql/streaming.py30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala157
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala67
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala4
18 files changed, 454 insertions, 90 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 64de33e8ec..3da6f497e9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
"""
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
(one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
@@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils):
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
- self._set_opts(compression=compression)
+ self._set_opts(
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
@since(1.4)
@@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
- header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
+ header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
+ timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils):
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
- nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
+ nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
+ dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.csv(path)
@since(1.5)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555003..3761d2b199 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+ timestampFormat=None):
"""
Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e23dacc7a1..c060091c7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -280,7 +280,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* </ul>
+ *
* @since 2.0.0
*/
@scala.annotation.varargs
@@ -374,10 +381,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* value.</li>
* <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
* value.</li>
- * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
- * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
- * and timestamp type. By default, it is `null` which means trying to parse times and date by
- * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()` or ISO 8601 format.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 44a9f312bd..a9049a60f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -483,6 +483,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 1.4.0
*/
@@ -575,6 +581,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index de3d889621..f1b4c11878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -139,20 +139,14 @@ private[csv] object CSVInferSchema {
}
private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
- if (options.dateFormat != null) {
- // This case infers a custom `dataFormat` is set.
- if ((allCatch opt options.dateFormat.parse(field)).isDefined) {
- TimestampType
- } else {
- tryParseBoolean(field, options)
- }
- } else {
+ // This case infers a custom `dataFormat` is set.
+ if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
+ TimestampType
+ } else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwords competibility.
- if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
- TimestampType
- } else {
- tryParseBoolean(field, options)
- }
+ TimestampType
+ } else {
+ tryParseBoolean(field, options)
}
}
@@ -277,18 +271,24 @@ private[csv] object CSVTypeCast {
val value = new BigDecimal(datum.replaceAll(",", ""))
Decimal(value, dt.precision, dt.scale)
}
- case _: TimestampType if options.dateFormat != null =>
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- options.dateFormat.parse(datum).getTime * 1000L
case _: TimestampType =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(datum).getTime * 1000L
- case _: DateType if options.dateFormat != null =>
- DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime)
+ Try(options.timestampFormat.parse(datum).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(datum).getTime * 1000L
+ }
case _: DateType =>
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.x
+ Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
+ }
case _: StringType => UTF8String.fromString(datum)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 10fe541a2c..364d7c831e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
-import java.text.SimpleDateFormat
+
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -101,11 +102,13 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
name.map(CompressionCodecs.getCodecClassName)
}
- // Share date format object as it is expensive to parse date pattern.
- val dateFormat: SimpleDateFormat = {
- val dateFormat = parameters.get("dateFormat")
- dateFormat.map(new SimpleDateFormat(_)).orNull
- }
+ // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+ val dateFormat: FastDateFormat =
+ FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+ val timestampFormat: FastDateFormat =
+ FastDateFormat.getInstance(
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
val maxColumns = getInt("maxColumns", 20480)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index de2d633c0b..33b170bc31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.types._
@@ -187,6 +188,14 @@ private[csv] class CsvOutputWriter(
// create the Generator without separator inserted between 2 records
private[this] val text = new Text()
+ // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
+ // When the value is null, this converter should not be called.
+ private type ValueConverter = (InternalRow, Int) => String
+
+ // `ValueConverter`s for all values in the fields of the schema
+ private val valueConverters: Array[ValueConverter] =
+ dataSchema.map(_.dataType).map(makeConverter).toArray
+
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
@@ -203,18 +212,40 @@ private[csv] class CsvOutputWriter(
private var records: Long = 0L
private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
- private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
- if (field != null) {
- field.toString
- } else {
- params.nullValue
+ private def rowToString(row: InternalRow): Seq[String] = {
+ var i = 0
+ val values = new Array[String](row.numFields)
+ while (i < row.numFields) {
+ if (!row.isNullAt(i)) {
+ values(i) = valueConverters(i).apply(row, i)
+ } else {
+ values(i) = params.nullValue
+ }
+ i += 1
}
+ values
+ }
+
+ private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ case DateType =>
+ (row: InternalRow, ordinal: Int) =>
+ params.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+
+ case TimestampType =>
+ (row: InternalRow, ordinal: Int) =>
+ params.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+
+ case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
+
+ case dt: DataType =>
+ (row: InternalRow, ordinal: Int) =>
+ row.get(ordinal, dt).toString
}
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
+ csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag)
records += 1
if (records % FLUSH_BATCH_SIZE == 0) {
flush()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 66f1126fb9..02d211d042 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
@@ -53,6 +54,14 @@ private[sql] class JSONOptions(
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+ // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+ val dateFormat: FastDateFormat =
+ FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+ val timestampFormat: FastDateFormat =
+ FastDateFormat.getInstance(
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 23f4a55491..270e7fbd3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -26,7 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._
-private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
+private[sql] class JacksonGenerator(
+ schema: StructType,
+ writer: Writer,
+ options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
// JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
@@ -76,11 +79,15 @@ private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
- gen.writeString(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString)
+ val timestampString =
+ options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+ gen.writeString(timestampString)
case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
- gen.writeString(DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString)
+ val dateString =
+ options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+ gen.writeString(dateString)
case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 4ae9376b5a..359a3e2aa8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.ByteArrayOutputStream
import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
import com.fasterxml.jackson.core._
@@ -204,7 +205,12 @@ class JacksonParser(
case VALUE_STRING =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ }
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
@@ -214,13 +220,18 @@ class JacksonParser(
(parser: JsonParser) => parseJsonToken(parser, dataType) {
case VALUE_STRING =>
val stringValue = parser.getText
- if (stringValue.contains("-")) {
- // The format of this string will probably be "yyyy-mm-dd".
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
- } else {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.x
+ Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+ .getOrElse {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 16150b91d6..7421314df7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -83,7 +83,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new JsonOutputWriter(path, bucketId, dataSchema, context)
+ new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
}
}
}
@@ -149,6 +149,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
private[json] class JsonOutputWriter(
path: String,
+ options: JSONOptions,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext)
@@ -156,7 +157,7 @@ private[json] class JsonOutputWriter(
private[this] val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- private[this] val gen = new JacksonGenerator(dataSchema, writer)
+ private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
private[this] val result = new Text()
private val recordWriter: RecordWriter[NullWritable, Text] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2e606b21bd..3ad1125229 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -186,6 +186,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
*
* @since 2.0.0
*/
@@ -228,10 +234,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* value.</li>
* <li>`negativeInf` (default `-Inf`): sets the string representation of a negative infinity
* value.</li>
- * <li>`dateFormat` (default `null`): sets the string that indicates a date format. Custom date
- * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type
- * and timestamp type. By default, it is `null` which means trying to parse times and date by
- * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.</li>
+ * <li>`dateFormat` (default `yyyy-MM-dd`): sets the string that indicates a date format.
+ * Custom date formats follow the formats at `java.text.SimpleDateFormat`. This applies to
+ * date type.</li>
+ * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
+ * indicates a timestamp format. Custom date formats follow the formats at
+ * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
@@ -258,7 +266,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
- * whether we should merge schemas collected from all Parquet part-files. This will override
+ * whether we should merge schemas collected from all
+ * Parquet part-files. This will override
* `spark.sql.parquet.mergeSchema`.</li>
*
* @since 2.0.0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index dbe3af49c9..5e00f669b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Timestamp field types are inferred correctly via custom data format") {
- var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm"))
+ var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
- options = new CSVOptions(Map("dateFormat" -> "yyyy"))
+ options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 8cd76ddf20..2befad6d72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -22,11 +22,13 @@ import java.nio.charset.UnsupportedCharsetException
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -477,7 +479,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
- "dateFormat" -> "dd/MM/yyyy hh:mm")
+ "timestampFormat" -> "dd/MM/yyyy HH:mm")
val results = spark.read
.format("csv")
.options(options)
@@ -485,7 +487,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.select("date")
.collect()
- val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm")
+ val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm")
val expected =
Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)),
Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)),
@@ -691,4 +693,155 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(cars, withHeader = true, checkValues = false)
}
+
+ test("Write timestamps correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("inferSchema", "true")
+ .option("header", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ // This will load back the timestamps as string.
+ val iso8601Timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601timestampsPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+ val expectedTimestamps = timestamps.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601Timestamps, expectedTimestamps)
+ }
+ }
+
+ test("Write dates correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+ val dates = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ dates.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601datesPath)
+
+ // This will load back the dates as string.
+ val iso8601dates = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601datesPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd")
+ val expectedDates = dates.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601dates, expectedDates)
+ }
+ }
+
+ test("Roundtrip in reading and writing timestamps") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(testFile(datesFile))
+
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ val iso8601timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(iso8601timestampsPath)
+
+ checkAnswer(iso8601timestamps, timestamps)
+ }
+ }
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
+ val datesWithFormat = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ datesWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringDatesWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+ val timestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestampsWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringTimestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(timestampsWithFormatPath)
+ val expectedStringTimestampsWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index 26b33b24ef..3ce643e667 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite {
assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0)
assert(CSVTypeCast.castTo("true", BooleanType) == true)
- val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm")
+ val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
val customTimestamp = "31/01/2015 00:00"
- val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime
- assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) ==
- expectedTime * 1000L)
- assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) ==
- DateTimeUtils.millisToDays(expectedTime))
+ val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+ val castedTimestamp =
+ CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions)
+ assert(castedTimestamp == expectedTime * 1000L)
+
+ val customDate = "31/01/2015"
+ val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+ val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+ val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions)
+ assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
val timestamp = "2015-01-01 00:00:00"
assert(CSVTypeCast.castTo(timestamp, TimestampType) ==
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 342fd3e82e..63a9061210 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -101,15 +101,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
+ val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
enforceCorrectType(ISO8601Time1, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(3601000),
- enforceCorrectType(ISO8601Time1, DateType))
- val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
enforceCorrectType(ISO8601Time2, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(10801000),
- enforceCorrectType(ISO8601Time2, DateType))
+
+ val ISO8601Date = "1970-01-01"
+ checkTypePromotion(DateTimeUtils.millisToDays(32400000),
+ enforceCorrectType(ISO8601Date, DateType))
}
test("Get compatible type") {
@@ -1664,4 +1664,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(df.schema.size === 2)
df.collect()
}
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
+ val datesWithFormat = spark.read
+ .schema(customSchema)
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+
+ datesWithFormat.write
+ .format("json")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringDatesWithFormat = spark.read
+ .schema(stringSchema)
+ .json(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+ val timestampsWithFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+ timestampsWithFormat.write
+ .format("json")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index f4a3336643..a400940db9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -222,6 +222,12 @@ private[json] trait TestJsonData {
spark.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+ def datesRecords: RDD[String] =
+ spark.sparkContext.parallelize(
+ """{"date": "26/08/2015 18:00"}""" ::
+ """{"date": "27/10/2014 18:30"}""" ::
+ """{"date": "28/01/2016 20:00"}""" :: Nil)
+
lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index d79edee5b1..52486b122a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -32,6 +32,10 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: NullType => false
case _: BinaryType => false
+ // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
+ // in `DateTimeUtils` parses the formatted string wrongly when the date is
+ // too early. (e.g. "1600-07-13T08:36:32.847").
+ case _: TimestampType => false
case _: CalendarIntervalType => false
case _ => true
}