diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-08-24 22:16:20 +0200 |
---|---|---|
committer | Herman van Hovell <hvanhovell@databricks.com> | 2016-08-24 22:16:20 +0200 |
commit | 29952ed096fd2a0a19079933ff691671d6f00835 (patch) | |
tree | 88846c8853b674b098e0c85ee1c462a8b99fb93a /python/pyspark/sql/streaming.py | |
parent | 891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92 (diff) | |
download | spark-29952ed096fd2a0a19079933ff691671d6f00835.tar.gz spark-29952ed096fd2a0a19079933ff691671d6f00835.tar.bz2 spark-29952ed096fd2a0a19079933ff691671d6f00835.zip |
[SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON
## What changes were proposed in this pull request?
### Default - ISO 8601
Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below:
- CSV
```
// TimestampType
1414459800000000
// DateType
16673
```
- Json
```
// TimestampType
1970-01-01 11:46:40.0
// DateType
1970-01-01
```
So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed.
So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)).
- For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`)
```
1970-01-01T02:00:01.000-01:00
```
- For `Date` it becomes as below (`yyyy-MM-dd`)
```
1970-01-01
```
### Custom date format option - `dateFormat`
This PR also adds the support to write and read dates and timestamps in a formatted string as below:
- **DateType**
- With `dateFormat` option (e.g. `yyyy/MM/dd`)
```
+----------+
| date|
+----------+
|2015/08/26|
|2014/10/27|
|2016/01/28|
+----------+
```
### Custom date format option - `timestampFormat`
- **TimestampType**
- With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`)
```
+----------------+
| date|
+----------------+
|2015/08/26 18:00|
|2014/10/27 18:30|
|2016/01/28 20:00|
+----------------+
```
## How was this patch tested?
Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r-- | python/pyspark/sql/streaming.py | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index a364555003..3761d2b199 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, + timestampFormat=None): """ Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. @@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils): ``spark.sql.columnNameOfCorruptRecord``. If None is set, it uses the value specified in ``spark.sql.columnNameOfCorruptRecord``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils): allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, + timestampFormat=timestampFormat) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: @@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils): def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): + negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils): is set, it uses the default value, ``Inf``. :param dateFormat: sets the string that indicates a date format. Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + applies to date type. If None is set, it uses the + default value value, ``yyyy-MM-dd``. + :param timestampFormat: sets the string that indicates a timestamp format. Custom date + formats follow the formats at ``java.text.SimpleDateFormat``. + This applies to timestamp type. If None is set, it uses the + default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param maxColumns: defines a hard limit of how many columns a record can have. If None is set, it uses the default value, ``20480``. :param maxCharsPerColumn: defines the maximum number of characters allowed for any given @@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils): header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, + maxCharsPerColumn=maxCharsPerColumn, maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) |