aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-24 22:16:20 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-24 22:16:20 +0200
commit29952ed096fd2a0a19079933ff691671d6f00835 (patch)
tree88846c8853b674b098e0c85ee1c462a8b99fb93a /python
parent891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92 (diff)
downloadspark-29952ed096fd2a0a19079933ff691671d6f00835.tar.gz
spark-29952ed096fd2a0a19079933ff691671d6f00835.tar.bz2
spark-29952ed096fd2a0a19079933ff691671d6f00835.zip
[SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON
## What changes were proposed in this pull request? ### Default - ISO 8601 Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below: - CSV ``` // TimestampType 1414459800000000 // DateType 16673 ``` - Json ``` // TimestampType 1970-01-01 11:46:40.0 // DateType 1970-01-01 ``` So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed. So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)). - For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`) ``` 1970-01-01T02:00:01.000-01:00 ``` - For `Date` it becomes as below (`yyyy-MM-dd`) ``` 1970-01-01 ``` ### Custom date format option - `dateFormat` This PR also adds the support to write and read dates and timestamps in a formatted string as below: - **DateType** - With `dateFormat` option (e.g. `yyyy/MM/dd`) ``` +----------+ | date| +----------+ |2015/08/26| |2014/10/27| |2016/01/28| +----------+ ``` ### Custom date format option - `timestampFormat` - **TimestampType** - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`) ``` +----------------+ | date| +----------------+ |2015/08/26 18:00| |2014/10/27 18:30| |2016/01/28 20:00| +----------------+ ``` ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py56
-rw-r--r--python/pyspark/sql/streaming.py30
2 files changed, 66 insertions, 20 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 64de33e8ec..3da6f497e9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -156,7 +156,7 @@ class DataFrameReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
"""
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
(one object per record) and returns the result as a :class`DataFrame`.
@@ -198,6 +198,14 @@ class DataFrameReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -213,7 +221,8 @@ class DataFrameReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -285,8 +294,8 @@ class DataFrameReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -327,9 +336,12 @@ class DataFrameReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -356,7 +368,8 @@ class DataFrameReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
@@ -571,7 +584,7 @@ class DataFrameWriter(OptionUtils):
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -584,11 +597,20 @@ class DataFrameWriter(OptionUtils):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
- self._set_opts(compression=compression)
+ self._set_opts(
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
@since(1.4)
@@ -634,7 +656,8 @@ class DataFrameWriter(OptionUtils):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
- header=None, nullValue=None, escapeQuotes=None, quoteAll=None):
+ header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
+ timestampFormat=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -666,12 +689,21 @@ class DataFrameWriter(OptionUtils):
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
- nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll)
+ nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
+ dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.csv(path)
@since(1.5)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555003..3761d2b199 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+ timestampFormat=None):
"""
Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))