aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql/readwriter.py32
-rw-r--r--python/pyspark/sql/streaming.py32
2 files changed, 48 insertions, 16 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 6bed390e60..b5e5b18bcb 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -191,10 +191,13 @@ class DataFrameReader(OptionUtils):
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
- record and puts the malformed string into a new field configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
- ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
+ schema. If a schema does not have the field, it drops corrupt records during \
+ parsing. When inferring a schema, it implicitly adds a \
+ ``columnNameOfCorruptRecord`` field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -304,7 +307,8 @@ class DataFrameReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
+ columnNameOfCorruptRecord=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -366,11 +370,22 @@ class DataFrameReader(OptionUtils):
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
- When a schema is set by user, it sets ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an \
+ user-defined schema. If a schema does not have the field, it drops corrupt \
+ records during parsing. When a length of parsed CSV tokens is shorter than \
+ an expected length of a schema, it sets `null` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
@@ -382,7 +397,8 @@ class DataFrameReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
+ columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 965c8f6b26..bd19fd4e38 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -463,10 +463,13 @@ class DataStreamReader(OptionUtils):
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
- record and puts the malformed string into a new field configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
- ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
+ schema. If a schema does not have the field, it drops corrupt records during \
+ parsing. When inferring a schema, it implicitly adds a \
+ ``columnNameOfCorruptRecord`` field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -558,7 +561,8 @@ class DataStreamReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
+ columnNameOfCorruptRecord=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -618,11 +622,22 @@ class DataStreamReader(OptionUtils):
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
- When a schema is set by user, it sets ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an \
+ user-defined schema. If a schema does not have the field, it drops corrupt \
+ records during parsing. When a length of parsed CSV tokens is shorter than \
+ an expected length of a schema, it sets `null` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
@@ -636,7 +651,8 @@ class DataStreamReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
+ columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else: