aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2017-02-15 13:26:34 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-15 13:26:34 -0800
commit865b2fd84c6f82de147540c8f17bbe0f0d9fb69c (patch)
tree554bb50328d64206b8ab36eb8d3af1be5e1c7fd1 /python
parent6a9a85b84decc2cbe1a0d8791118a0f91a62aa3f (diff)
downloadspark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.tar.gz
spark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.tar.bz2
spark-865b2fd84c6f82de147540c8f17bbe0f0d9fb69c.zip
[SPARK-18937][SQL] Timezone support in CSV/JSON parsing
## What changes were proposed in this pull request? This is a follow-up pr of #16308. This pr enables timezone support in CSV/JSON parsing. We should introduce `timeZone` option for CSV/JSON datasources (the default value of the option is session local timezone). The datasources should use the `timeZone` option to format/parse to write/read timestamp values. Notice that while reading, if the timestampFormat has the timezone info, the timezone will not be used because we should respect the timezone in the values. For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values written with the default timezone option, which is `"GMT"` because session local timezone is `"GMT"` here, are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "GMT") scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> df.write.json("/path/to/gmtjson") ``` ```sh $ cat /path/to/gmtjson/part-* {"ts":"2016-01-01T00:00:00.000Z"} ``` whereas setting the option to `"PST"`, they are: ```scala scala> df.write.option("timeZone", "PST").json("/path/to/pstjson") ``` ```sh $ cat /path/to/pstjson/part-* {"ts":"2015-12-31T16:00:00.000-08:00"} ``` We can properly read these files even if the timezone option is wrong because the timestamp values have timezone info: ```scala scala> val schema = new StructType().add("ts", TimestampType) schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) scala> spark.read.schema(schema).json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ scala> spark.read.schema(schema).option("timeZone", "PST").json("/path/to/gmtjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` And even if `timezoneFormat` doesn't contain timezone info, we can properly read the values with setting correct timezone option: ```scala scala> df.write.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson") ``` ```sh $ cat /path/to/jstjson/part-* {"ts":"2016-01-01T09:00:00"} ``` ```scala // wrong result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 09:00:00| +-------------------+ // correct result scala> spark.read.schema(schema).option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss").option("timeZone", "JST").json("/path/to/jstjson").show() +-------------------+ |ts | +-------------------+ |2016-01-01 00:00:00| +-------------------+ ``` This pr also makes `JsonToStruct` and `StructToJson` `TimeZoneAwareExpression` to be able to evaluate values with timezone option. ## How was this patch tested? Existing tests and added some tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #16750 from ueshin/issues/SPARK-18937.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py43
-rw-r--r--python/pyspark/sql/streaming.py20
2 files changed, 39 insertions, 24 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d31f3fb8f6..1678334889 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -158,7 +158,8 @@ class DataFrameReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
+ timeZone=None):
"""
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
@@ -204,11 +205,13 @@ class DataFrameReader(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -225,7 +228,7 @@ class DataFrameReader(OptionUtils):
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat)
+ timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -298,7 +301,7 @@ class DataFrameReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -341,11 +344,11 @@ class DataFrameReader(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -357,6 +360,8 @@ class DataFrameReader(OptionUtils):
uses the default value, ``10``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
@@ -374,7 +379,7 @@ class DataFrameReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -591,7 +596,8 @@ class DataFrameWriter(OptionUtils):
self._jwrite.saveAsTable(name)
@since(1.4)
- def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
+ def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
+ timeZone=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -607,17 +613,20 @@ class DataFrameWriter(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to format timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
- compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
+ compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
+ timeZone=timeZone)
self._jwrite.json(path)
@since(1.4)
@@ -664,7 +673,7 @@ class DataFrameWriter(OptionUtils):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
- timestampFormat=None):
+ timestampFormat=None, timeZone=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -699,18 +708,20 @@ class DataFrameWriter(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
- dateFormat=dateFormat, timestampFormat=timestampFormat)
+ dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
self._jwrite.csv(path)
@since(1.5)
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a10b185cd4..d988e596a8 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -429,7 +429,7 @@ class DataStreamReader(OptionUtils):
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
- timestampFormat=None):
+ timestampFormat=None, timeZone=None):
"""
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.
@@ -476,11 +476,13 @@ class DataStreamReader(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -494,7 +496,7 @@ class DataStreamReader(OptionUtils):
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat)
+ timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -552,7 +554,7 @@ class DataStreamReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -597,11 +599,11 @@ class DataStreamReader(OptionUtils):
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
- default value value, ``yyyy-MM-dd``.
+ default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
- default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
+ default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -609,6 +611,8 @@ class DataStreamReader(OptionUtils):
``-1`` meaning unlimited length.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
+ :param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
+ If None is set, it uses the default value, session local timezone.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
@@ -628,7 +632,7 @@ class DataStreamReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else: