diff options
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 32 | ||||
-rw-r--r-- | python/pyspark/sql/streaming.py | 32 |
2 files changed, 48 insertions, 16 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6bed390e60..b5e5b18bcb 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -191,10 +191,13 @@ class DataFrameReader(OptionUtils): :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -304,7 +307,8 @@ class DataFrameReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -366,11 +370,22 @@ class DataFrameReader(OptionUtils): :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. + >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes [('_c0', 'string'), ('_c1', 'string')] @@ -382,7 +397,8 @@ class DataFrameReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 965c8f6b26..bd19fd4e38 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -463,10 +463,13 @@ class DataStreamReader(OptionUtils): :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -558,7 +561,8 @@ class DataStreamReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -618,11 +622,22 @@ class DataStreamReader(OptionUtils): :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. + >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming True @@ -636,7 +651,8 @@ class DataStreamReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: |