aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py30
1 files changed, 22 insertions, 8 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index a364555003..3761d2b199 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -338,7 +338,8 @@ class DataStreamReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
+ timestampFormat=None):
"""
Loads a JSON file stream (one object per line) and returns a :class`DataFrame`.
@@ -381,6 +382,14 @@ class DataStreamReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -393,7 +402,8 @@ class DataStreamReader(OptionUtils):
allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
- mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
+ timestampFormat=timestampFormat)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -450,8 +460,8 @@ class DataStreamReader(OptionUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
- maxMalformedLogPerPartition=None, mode=None):
+ negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -494,9 +504,12 @@ class DataStreamReader(OptionUtils):
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
- applies to both date type and timestamp type. By default, it is None
- which means trying to parse times and date by
- ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ applies to date type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd``.
+ :param timestampFormat: sets the string that indicates a timestamp format. Custom date
+ formats follow the formats at ``java.text.SimpleDateFormat``.
+ This applies to timestamp type. If None is set, it uses the
+ default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
@@ -521,7 +534,8 @@ class DataStreamReader(OptionUtils):
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
- dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
+ maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))