aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-06-21 10:47:51 -0700
committerReynold Xin <rxin@databricks.com>2016-06-21 10:47:51 -0700
commit93338807aafdb2db9fb036ceadee1467cd367cdd (patch)
tree1a12d06363c9e879f22286713c7e2a99442e34d5 /python/pyspark/sql
parent4f83ca1059a3b580fca3f006974ff5ac4d5212a1 (diff)
downloadspark-93338807aafdb2db9fb036ceadee1467cd367cdd.tar.gz
spark-93338807aafdb2db9fb036ceadee1467cd367cdd.tar.bz2
spark-93338807aafdb2db9fb036ceadee1467cd367cdd.zip
[SPARK-13792][SQL] Addendum: Fix Python API
## What changes were proposed in this pull request? This is a follow-up to https://github.com/apache/spark/pull/13795 to properly set CSV options in Python API. As part of this, I also make the Python option setting for both CSV and JSON more robust against positional errors. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #13800 from rxin/SPARK-13792-2.
Diffstat (limited to 'python/pyspark/sql')
-rw-r--r--python/pyspark/sql/readwriter.py54
1 files changed, 33 insertions, 21 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 89506ca02f..ccbf895c2d 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -77,7 +77,7 @@ class ReaderUtils(object):
def _set_csv_opts(self, schema, sep, encoding, quote, escape,
comment, header, inferSchema, ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode):
+ dateFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode):
"""
Set options based on the CSV optional parameters
"""
@@ -115,6 +115,8 @@ class ReaderUtils(object):
self.option("maxColumns", maxColumns)
if maxCharsPerColumn is not None:
self.option("maxCharsPerColumn", maxCharsPerColumn)
+ if maxMalformedLogPerPartition is not None:
+ self.option("maxMalformedLogPerPartition", maxMalformedLogPerPartition)
if mode is not None:
self.option("mode", mode)
@@ -268,10 +270,12 @@ class DataFrameReader(ReaderUtils):
[('age', 'bigint'), ('name', 'string')]
"""
- self._set_json_opts(schema, primitivesAsString, prefersDecimal,
- allowComments, allowUnquotedFieldNames, allowSingleQuotes,
- allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
- mode, columnNameOfCorruptRecord)
+ self._set_json_opts(
+ schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+ allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+ allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+ allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -343,7 +347,8 @@ class DataFrameReader(ReaderUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+ maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -408,11 +413,13 @@ class DataFrameReader(ReaderUtils):
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
"""
-
- self._set_csv_opts(schema, sep, encoding, quote, escape,
- comment, header, inferSchema, ignoreLeadingWhiteSpace,
- ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode)
+ self._set_csv_opts(
+ schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+ header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+ ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+ nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+ dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@@ -958,10 +965,12 @@ class DataStreamReader(ReaderUtils):
>>> json_sdf.schema == sdf_schema
True
"""
- self._set_json_opts(schema, primitivesAsString, prefersDecimal,
- allowComments, allowUnquotedFieldNames, allowSingleQuotes,
- allowNumericLeadingZero, allowBackslashEscapingAnyCharacter,
- mode, columnNameOfCorruptRecord)
+ self._set_json_opts(
+ schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal,
+ allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames,
+ allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
+ allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+ mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
@@ -1019,7 +1028,8 @@ class DataStreamReader(ReaderUtils):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
- negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, mode=None):
+ negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None,
+ maxMalformedLogPerPartition=None, mode=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -1085,11 +1095,13 @@ class DataStreamReader(ReaderUtils):
>>> csv_sdf.schema == sdf_schema
True
"""
-
- self._set_csv_opts(schema, sep, encoding, quote, escape,
- comment, header, inferSchema, ignoreLeadingWhiteSpace,
- ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf,
- dateFormat, maxColumns, maxCharsPerColumn, mode)
+ self._set_csv_opts(
+ schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
+ header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+ ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
+ nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
+ dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
+ maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else: