aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorTakeshi Yamamuro <yamamuro@apache.org>2017-02-23 12:09:36 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-23 12:09:36 -0800
commit09ed6e7711d0758c24944516a263b8bd4e1728fc (patch)
tree14decfedc993886ff382f9313f042053dc564f48 /python/pyspark
parent9bf4e2baad0e2851da554d85223ffaa029cfa490 (diff)
downloadspark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.gz
spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.bz2
spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.zip
[SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data
## What changes were proposed in this pull request? This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails); ``` Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at scala.util.Try.getOrElse(Try.scala:79) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269) at ``` In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field. ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #16928 from maropu/SPARK-18699-2.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/sql/readwriter.py32
-rw-r--r--python/pyspark/sql/streaming.py32
2 files changed, 48 insertions, 16 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 6bed390e60..b5e5b18bcb 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -191,10 +191,13 @@ class DataFrameReader(OptionUtils):
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
- record and puts the malformed string into a new field configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
- ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
+ schema. If a schema does not have the field, it drops corrupt records during \
+ parsing. When inferring a schema, it implicitly adds a \
+ ``columnNameOfCorruptRecord`` field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -304,7 +307,8 @@ class DataFrameReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
+ columnNameOfCorruptRecord=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -366,11 +370,22 @@ class DataFrameReader(OptionUtils):
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
- When a schema is set by user, it sets ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an \
+ user-defined schema. If a schema does not have the field, it drops corrupt \
+ records during parsing. When a length of parsed CSV tokens is shorter than \
+ an expected length of a schema, it sets `null` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
@@ -382,7 +397,8 @@ class DataFrameReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
+ columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 965c8f6b26..bd19fd4e38 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -463,10 +463,13 @@ class DataStreamReader(OptionUtils):
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
- record and puts the malformed string into a new field configured by \
- ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
- ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
+ schema. If a schema does not have the field, it drops corrupt records during \
+ parsing. When inferring a schema, it implicitly adds a \
+ ``columnNameOfCorruptRecord`` field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -558,7 +561,8 @@ class DataStreamReader(OptionUtils):
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
- maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
+ maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
+ columnNameOfCorruptRecord=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -618,11 +622,22 @@ class DataStreamReader(OptionUtils):
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
- When a schema is set by user, it sets ``null`` for extra fields.
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+ record, and puts the malformed string into a field configured by \
+ ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
+ a string type field named ``columnNameOfCorruptRecord`` in an \
+ user-defined schema. If a schema does not have the field, it drops corrupt \
+ records during parsing. When a length of parsed CSV tokens is shorter than \
+ an expected length of a schema, it sets `null` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+ ``spark.sql.columnNameOfCorruptRecord``.
+
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
@@ -636,7 +651,8 @@ class DataStreamReader(OptionUtils):
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
- maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
+ columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else: