aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-05-02 17:50:40 -0700
committerReynold Xin <rxin@databricks.com>2016-05-02 17:50:40 -0700
commitd37c7f7f042f7943b5b684e53cf4284c601fb347 (patch)
tree77c16f702925a8d33581ffd124b4993e27fcbf06 /python
parent35d9c8aa69c650f33037813607dc939922c5fc27 (diff)
downloadspark-d37c7f7f042f7943b5b684e53cf4284c601fb347.tar.gz
spark-d37c7f7f042f7943b5b684e53cf4284c601fb347.tar.bz2
spark-d37c7f7f042f7943b5b684e53cf4284c601fb347.zip
[SPARK-15050][SQL] Put CSV and JSON options as Python csv and json function parameters
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-15050 This PR adds function parameters for Python API for reading and writing `csv()`. ## How was this patch tested? This was tested by `./dev/run_tests`. Author: hyukjinkwon <gurwls223@gmail.com> Author: Hyukjin Kwon <gurwls223@gmail.com> Closes #12834 from HyukjinKwon/SPARK-15050.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py232
1 files changed, 155 insertions, 77 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cc5e93dcad..e2ee9db049 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -166,7 +166,10 @@ class DataFrameReader(object):
return self._df(self._jreader.stream())
@since(1.4)
- def json(self, path, schema=None):
+ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
+ allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
"""
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
(one object per record) and returns the result as a :class`DataFrame`.
@@ -177,31 +180,36 @@ class DataFrameReader(object):
:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
+ :param primitivesAsString: infers all primitive values as a string type. If None is set,
+ it uses the default value, ``false``.
+ :param prefersDecimal: infers all floating-point values as a decimal type. If the values
+ do not fit in decimal, then it infers them as doubles. If None is
+ set, it uses the default value, ``false``.
+ :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
+ it uses the default value, ``false``.
+ :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
+ it uses the default value, ``false``.
+ :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
+ set, it uses the default value, ``true``.
+ :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
+ set, it uses the default value, ``false``.
+ :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
+ using backslash quoting mechanism. If None is
+ set, it uses the default value, ``false``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
- You can set the following JSON-specific options to deal with non-standard JSON files:
- * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
- type
- * `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
- type. If the values do not fit in decimal, then it infers them as doubles.
- * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
- * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
- * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
- quotes
- * ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
- (e.g. 00012)
- * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
- of all character using backslash quoting mechanism
- * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
- during parsing.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record and puts the malformed string into a new field configured by \
``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
- * ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \
- new field having malformed string created by ``PERMISSIVE`` mode. \
- This overrides ``spark.sql.columnNameOfCorruptRecord``.
+
+ :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+ created by ``PERMISSIVE`` mode. This overrides
+ ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the default value ``_corrupt_record``.
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -214,6 +222,24 @@ class DataFrameReader(object):
"""
if schema is not None:
self.schema(schema)
+ if primitivesAsString is not None:
+ self.option("primitivesAsString", primitivesAsString)
+ if prefersDecimal is not None:
+ self.option("prefersDecimal", prefersDecimal)
+ if allowComments is not None:
+ self.option("allowComments", allowComments)
+ if allowUnquotedFieldNames is not None:
+ self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+ if allowSingleQuotes is not None:
+ self.option("allowSingleQuotes", allowSingleQuotes)
+ if allowNumericLeadingZero is not None:
+ self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+ if allowBackslashEscapingAnyCharacter is not None:
+ self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter)
+ if mode is not None:
+ self.option("mode", mode)
+ if columnNameOfCorruptRecord is not None:
+ self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
elif type(path) == list:
@@ -270,53 +296,62 @@ class DataFrameReader(object):
[Row(value=u'hello'), Row(value=u'this')]
"""
if isinstance(paths, basestring):
- paths = [paths]
- return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+ path = [paths]
+ return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
@since(2.0)
- def csv(self, paths):
+ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
+ comment=None, header=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
+ nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None,
+ maxColumns=None, maxCharsPerColumn=None, mode=None):
"""Loads a CSV file and returns the result as a [[DataFrame]].
This function goes through the input once to determine the input schema. To avoid going
through the entire data once, specify the schema explicitly using [[schema]].
- :param paths: string, or list of strings, for input path(s).
-
- You can set the following CSV-specific options to deal with CSV files:
- * ``sep`` (default ``,``): sets the single character as a separator \
- for each field and value.
- * ``charset`` (default ``UTF-8``): decodes the CSV files by the given \
- encoding type.
- * ``quote`` (default ``"``): sets the single character used for escaping \
- quoted values where the separator can be part of the value.
- * ``escape`` (default ``\``): sets the single character used for escaping quotes \
- inside an already quoted value.
- * ``comment`` (default empty string): sets the single character used for skipping \
- lines beginning with this character. By default, it is disabled.
- * ``header`` (default ``false``): uses the first line as names of columns.
- * ``ignoreLeadingWhiteSpace`` (default ``false``): defines whether or not leading \
- whitespaces from values being read should be skipped.
- * ``ignoreTrailingWhiteSpace`` (default ``false``): defines whether or not trailing \
- whitespaces from values being read should be skipped.
- * ``nullValue`` (default empty string): sets the string representation of a null value.
- * ``nanValue`` (default ``NaN``): sets the string representation of a non-number \
- value.
- * ``positiveInf`` (default ``Inf``): sets the string representation of a positive \
- infinity value.
- * ``negativeInf`` (default ``-Inf``): sets the string representation of a negative \
- infinity value.
- * ``dateFormat`` (default ``None``): sets the string that indicates a date format. \
- Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This \
- applies to both date type and timestamp type. By default, it is None which means \
- trying to parse times and date by ``java.sql.Timestamp.valueOf()`` and \
- ``java.sql.Date.valueOf()``.
- * ``maxColumns`` (default ``20480``): defines a hard limit of how many columns \
- a record can have.
- * ``maxCharsPerColumn`` (default ``1000000``): defines the maximum number of \
- characters allowed for any given value being read.
- * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
- during parsing.
- * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. \
+ :param path: string, or list of strings, for input path(s).
+ :param schema: an optional :class:`StructType` for the input schema.
+ :param sep: sets the single character as a separator for each field and value.
+ If None is set, it uses the default value, ``,``.
+ :param encoding: decodes the CSV files by the given encoding type. If None is set,
+ it uses the default value, ``UTF-8``.
+ :param quote: sets the single character used for escaping quoted values where the
+ separator can be part of the value. If None is set, it uses the default
+ value, ``"``.
+ :param escape: sets the single character used for escaping quotes inside an already
+ quoted value. If None is set, it uses the default value, ``\``.
+ :param comment: sets the single character used for skipping lines beginning with this
+ character. By default (None), it is disabled.
+ :param header: uses the first line as names of columns. If None is set, it uses the
+ default value, ``false``.
+ :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
+ being read should be skipped. If None is set, it uses
+ the default value, ``false``.
+ :param nullValue: sets the string representation of a null value. If None is set, it uses
+ the default value, empty string.
+ :param nanValue: sets the string representation of a non-number value. If None is set, it
+ uses the default value, ``NaN``.
+ :param positiveInf: sets the string representation of a positive infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param negativeInf: sets the string representation of a negative infinity value. If None
+ is set, it uses the default value, ``Inf``.
+ :param dateFormat: sets the string that indicates a date format. Custom date formats
+ follow the formats at ``java.text.SimpleDateFormat``. This
+ applies to both date type and timestamp type. By default, it is None
+ which means trying to parse times and date by
+ ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+ :param maxColumns: defines a hard limit of how many columns a record can have. If None is
+ set, it uses the default value, ``20480``.
+ :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
+ value being read. If None is set, it uses the default value,
+ ``1000000``.
+ :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -325,9 +360,43 @@ class DataFrameReader(object):
>>> df.dtypes
[('C0', 'string'), ('C1', 'string')]
"""
- if isinstance(paths, basestring):
- paths = [paths]
- return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+ if schema is not None:
+ self.schema(schema)
+ if sep is not None:
+ self.option("sep", sep)
+ if encoding is not None:
+ self.option("encoding", encoding)
+ if quote is not None:
+ self.option("quote", quote)
+ if escape is not None:
+ self.option("escape", escape)
+ if comment is not None:
+ self.option("comment", comment)
+ if header is not None:
+ self.option("header", header)
+ if ignoreLeadingWhiteSpace is not None:
+ self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
+ if ignoreTrailingWhiteSpace is not None:
+ self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
+ if nullValue is not None:
+ self.option("nullValue", nullValue)
+ if nanValue is not None:
+ self.option("nanValue", nanValue)
+ if positiveInf is not None:
+ self.option("positiveInf", positiveInf)
+ if negativeInf is not None:
+ self.option("negativeInf", negativeInf)
+ if dateFormat is not None:
+ self.option("dateFormat", dateFormat)
+ if maxColumns is not None:
+ self.option("maxColumns", maxColumns)
+ if maxCharsPerColumn is not None:
+ self.option("maxCharsPerColumn", maxCharsPerColumn)
+ if mode is not None:
+ self.option("mode", mode)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
@since(1.5)
def orc(self, path):
@@ -687,7 +756,8 @@ class DataFrameWriter(object):
self._jwrite.text(path)
@since(2.0)
- def csv(self, path, mode=None, compression=None):
+ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
+ header=None, nullValue=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@@ -701,25 +771,33 @@ class DataFrameWriter(object):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
-
- You can set the following CSV-specific options to deal with CSV files:
- * ``sep`` (default ``,``): sets the single character as a separator \
- for each field and value.
- * ``quote`` (default ``"``): sets the single character used for escaping \
- quoted values where the separator can be part of the value.
- * ``escape`` (default ``\``): sets the single character used for escaping quotes \
- inside an already quoted value.
- * ``header`` (default ``false``): writes the names of columns as the first line.
- * ``nullValue`` (default empty string): sets the string representation of a null value.
- * ``compression``: compression codec to use when saving to file. This can be one of \
- the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and \
- deflate).
+ :param sep: sets the single character as a separator for each field and value. If None is
+ set, it uses the default value, ``,``.
+ :param quote: sets the single character used for escaping quoted values where the
+ separator can be part of the value. If None is set, it uses the default
+ value, ``"``.
+ :param escape: sets the single character used for escaping quotes inside an already
+ quoted value. If None is set, it uses the default value, ``\``
+ :param header: writes the names of columns as the first line. If None is set, it uses
+ the default value, ``false``.
+ :param nullValue: sets the string representation of a null value. If None is set, it uses
+ the default value, empty string.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if compression is not None:
self.option("compression", compression)
+ if sep is not None:
+ self.option("sep", sep)
+ if quote is not None:
+ self.option("quote", quote)
+ if escape is not None:
+ self.option("escape", escape)
+ if header is not None:
+ self.option("header", header)
+ if nullValue is not None:
+ self.option("nullValue", nullValue)
self._jwrite.csv(path)
@since(1.5)
@@ -797,7 +875,7 @@ def _test():
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext._createForTesting(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
- globs['sdf'] =\
+ globs['sdf'] = \
globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(