diff options
author | Takeshi Yamamuro <yamamuro@apache.org> | 2017-02-23 12:09:36 -0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-02-23 12:09:36 -0800 |
commit | 09ed6e7711d0758c24944516a263b8bd4e1728fc (patch) | |
tree | 14decfedc993886ff382f9313f042053dc564f48 /python/pyspark | |
parent | 9bf4e2baad0e2851da554d85223ffaa029cfa490 (diff) | |
download | spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.gz spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.bz2 spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.zip |
[SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data
## What changes were proposed in this pull request?
This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails);
```
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269)
at
```
In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field.
## How was this patch tested?
Added tests in `CSVSuite`.
Author: Takeshi Yamamuro <yamamuro@apache.org>
Closes #16928 from maropu/SPARK-18699-2.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 32 | ||||
-rw-r--r-- | python/pyspark/sql/streaming.py | 32 |
2 files changed, 48 insertions, 16 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 6bed390e60..b5e5b18bcb 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -191,10 +191,13 @@ class DataFrameReader(OptionUtils): :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -304,7 +307,8 @@ class DataFrameReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -366,11 +370,22 @@ class DataFrameReader(OptionUtils): :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. + >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes [('_c0', 'string'), ('_c1', 'string')] @@ -382,7 +397,8 @@ class DataFrameReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): path = [path] return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 965c8f6b26..bd19fd4e38 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -463,10 +463,13 @@ class DataStreamReader(OptionUtils): :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ + schema. If a schema does not have the field, it drops corrupt records during \ + parsing. When inferring a schema, it implicitly adds a \ + ``columnNameOfCorruptRecord`` field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -558,7 +561,8 @@ class DataStreamReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + columnNameOfCorruptRecord=None): """Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -618,11 +622,22 @@ class DataStreamReader(OptionUtils): :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record, and puts the malformed string into a field configured by \ + ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ + a string type field named ``columnNameOfCorruptRecord`` in an \ + user-defined schema. If a schema does not have the field, it drops corrupt \ + records during parsing. When a length of parsed CSV tokens is shorter than \ + an expected length of a schema, it sets `null` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. + >>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema) >>> csv_sdf.isStreaming True @@ -636,7 +651,8 @@ class DataStreamReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + columnNameOfCorruptRecord=columnNameOfCorruptRecord) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: |