From d37c7f7f042f7943b5b684e53cf4284c601fb347 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 2 May 2016 17:50:40 -0700 Subject: [SPARK-15050][SQL] Put CSV and JSON options as Python csv and json function parameters ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-15050 This PR adds function parameters for Python API for reading and writing `csv()`. ## How was this patch tested? This was tested by `./dev/run_tests`. Author: hyukjinkwon Author: Hyukjin Kwon Closes #12834 from HyukjinKwon/SPARK-15050. --- python/pyspark/sql/readwriter.py | 232 ++++++++++++++++++++++++++------------- 1 file changed, 155 insertions(+), 77 deletions(-) (limited to 'python') diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index cc5e93dcad..e2ee9db049 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -166,7 +166,10 @@ class DataFrameReader(object): return self._df(self._jreader.stream()) @since(1.4) - def json(self, path, schema=None): + def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, + allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, + allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, + mode=None, columnNameOfCorruptRecord=None): """ Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects (one object per record) and returns the result as a :class`DataFrame`. @@ -177,31 +180,36 @@ class DataFrameReader(object): :param path: string represents path to the JSON dataset, or RDD of Strings storing JSON objects. :param schema: an optional :class:`StructType` for the input schema. + :param primitivesAsString: infers all primitive values as a string type. If None is set, + it uses the default value, ``false``. + :param prefersDecimal: infers all floating-point values as a decimal type. If the values + do not fit in decimal, then it infers them as doubles. If None is + set, it uses the default value, ``false``. + :param allowComments: ignores Java/C++ style comment in JSON records. If None is set, + it uses the default value, ``false``. + :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set, + it uses the default value, ``false``. + :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is + set, it uses the default value, ``true``. + :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is + set, it uses the default value, ``false``. + :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character + using backslash quoting mechanism. If None is + set, it uses the default value, ``false``. + :param mode: allows a mode for dealing with corrupt records during parsing. If None is + set, it uses the default value, ``PERMISSIVE``. - You can set the following JSON-specific options to deal with non-standard JSON files: - * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \ - type - * `prefersDecimal` (default `false`): infers all floating-point values as a decimal \ - type. If the values do not fit in decimal, then it infers them as doubles. - * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records - * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names - * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \ - quotes - * ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \ - (e.g. 00012) - * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \ - of all character using backslash quoting mechanism - * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \ - during parsing. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ record and puts the malformed string into a new field configured by \ ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. - * ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \ - new field having malformed string created by ``PERMISSIVE`` mode. \ - This overrides ``spark.sql.columnNameOfCorruptRecord``. + + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the default value ``_corrupt_record``. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -214,6 +222,24 @@ class DataFrameReader(object): """ if schema is not None: self.schema(schema) + if primitivesAsString is not None: + self.option("primitivesAsString", primitivesAsString) + if prefersDecimal is not None: + self.option("prefersDecimal", prefersDecimal) + if allowComments is not None: + self.option("allowComments", allowComments) + if allowUnquotedFieldNames is not None: + self.option("allowUnquotedFieldNames", allowUnquotedFieldNames) + if allowSingleQuotes is not None: + self.option("allowSingleQuotes", allowSingleQuotes) + if allowNumericLeadingZero is not None: + self.option("allowNumericLeadingZero", allowNumericLeadingZero) + if allowBackslashEscapingAnyCharacter is not None: + self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter) + if mode is not None: + self.option("mode", mode) + if columnNameOfCorruptRecord is not None: + self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) if isinstance(path, basestring): return self._df(self._jreader.json(path)) elif type(path) == list: @@ -270,53 +296,62 @@ class DataFrameReader(object): [Row(value=u'hello'), Row(value=u'this')] """ if isinstance(paths, basestring): - paths = [paths] - return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths))) + path = [paths] + return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) @since(2.0) - def csv(self, paths): + def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, + comment=None, header=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, + nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, + maxColumns=None, maxCharsPerColumn=None, mode=None): """Loads a CSV file and returns the result as a [[DataFrame]]. This function goes through the input once to determine the input schema. To avoid going through the entire data once, specify the schema explicitly using [[schema]]. - :param paths: string, or list of strings, for input path(s). - - You can set the following CSV-specific options to deal with CSV files: - * ``sep`` (default ``,``): sets the single character as a separator \ - for each field and value. - * ``charset`` (default ``UTF-8``): decodes the CSV files by the given \ - encoding type. - * ``quote`` (default ``"``): sets the single character used for escaping \ - quoted values where the separator can be part of the value. - * ``escape`` (default ``\``): sets the single character used for escaping quotes \ - inside an already quoted value. - * ``comment`` (default empty string): sets the single character used for skipping \ - lines beginning with this character. By default, it is disabled. - * ``header`` (default ``false``): uses the first line as names of columns. - * ``ignoreLeadingWhiteSpace`` (default ``false``): defines whether or not leading \ - whitespaces from values being read should be skipped. - * ``ignoreTrailingWhiteSpace`` (default ``false``): defines whether or not trailing \ - whitespaces from values being read should be skipped. - * ``nullValue`` (default empty string): sets the string representation of a null value. - * ``nanValue`` (default ``NaN``): sets the string representation of a non-number \ - value. - * ``positiveInf`` (default ``Inf``): sets the string representation of a positive \ - infinity value. - * ``negativeInf`` (default ``-Inf``): sets the string representation of a negative \ - infinity value. - * ``dateFormat`` (default ``None``): sets the string that indicates a date format. \ - Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This \ - applies to both date type and timestamp type. By default, it is None which means \ - trying to parse times and date by ``java.sql.Timestamp.valueOf()`` and \ - ``java.sql.Date.valueOf()``. - * ``maxColumns`` (default ``20480``): defines a hard limit of how many columns \ - a record can have. - * ``maxCharsPerColumn`` (default ``1000000``): defines the maximum number of \ - characters allowed for any given value being read. - * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \ - during parsing. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. \ + :param path: string, or list of strings, for input path(s). + :param schema: an optional :class:`StructType` for the input schema. + :param sep: sets the single character as a separator for each field and value. + If None is set, it uses the default value, ``,``. + :param encoding: decodes the CSV files by the given encoding type. If None is set, + it uses the default value, ``UTF-8``. + :param quote: sets the single character used for escaping quoted values where the + separator can be part of the value. If None is set, it uses the default + value, ``"``. + :param escape: sets the single character used for escaping quotes inside an already + quoted value. If None is set, it uses the default value, ``\``. + :param comment: sets the single character used for skipping lines beginning with this + character. By default (None), it is disabled. + :param header: uses the first line as names of columns. If None is set, it uses the + default value, ``false``. + :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values + being read should be skipped. If None is set, it uses + the default value, ``false``. + :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values + being read should be skipped. If None is set, it uses + the default value, ``false``. + :param nullValue: sets the string representation of a null value. If None is set, it uses + the default value, empty string. + :param nanValue: sets the string representation of a non-number value. If None is set, it + uses the default value, ``NaN``. + :param positiveInf: sets the string representation of a positive infinity value. If None + is set, it uses the default value, ``Inf``. + :param negativeInf: sets the string representation of a negative infinity value. If None + is set, it uses the default value, ``Inf``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to parse times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + :param maxColumns: defines a hard limit of how many columns a record can have. If None is + set, it uses the default value, ``20480``. + :param maxCharsPerColumn: defines the maximum number of characters allowed for any given + value being read. If None is set, it uses the default value, + ``1000000``. + :param mode: allows a mode for dealing with corrupt records during parsing. If None is + set, it uses the default value, ``PERMISSIVE``. + + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. When a schema is set by user, it sets ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -325,9 +360,43 @@ class DataFrameReader(object): >>> df.dtypes [('C0', 'string'), ('C1', 'string')] """ - if isinstance(paths, basestring): - paths = [paths] - return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths))) + if schema is not None: + self.schema(schema) + if sep is not None: + self.option("sep", sep) + if encoding is not None: + self.option("encoding", encoding) + if quote is not None: + self.option("quote", quote) + if escape is not None: + self.option("escape", escape) + if comment is not None: + self.option("comment", comment) + if header is not None: + self.option("header", header) + if ignoreLeadingWhiteSpace is not None: + self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) + if ignoreTrailingWhiteSpace is not None: + self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) + if nullValue is not None: + self.option("nullValue", nullValue) + if nanValue is not None: + self.option("nanValue", nanValue) + if positiveInf is not None: + self.option("positiveInf", positiveInf) + if negativeInf is not None: + self.option("negativeInf", negativeInf) + if dateFormat is not None: + self.option("dateFormat", dateFormat) + if maxColumns is not None: + self.option("maxColumns", maxColumns) + if maxCharsPerColumn is not None: + self.option("maxCharsPerColumn", maxCharsPerColumn) + if mode is not None: + self.option("mode", mode) + if isinstance(path, basestring): + path = [path] + return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(path))) @since(1.5) def orc(self, path): @@ -687,7 +756,8 @@ class DataFrameWriter(object): self._jwrite.text(path) @since(2.0) - def csv(self, path, mode=None, compression=None): + def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, + header=None, nullValue=None): """Saves the content of the [[DataFrame]] in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -701,25 +771,33 @@ class DataFrameWriter(object): :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). - - You can set the following CSV-specific options to deal with CSV files: - * ``sep`` (default ``,``): sets the single character as a separator \ - for each field and value. - * ``quote`` (default ``"``): sets the single character used for escaping \ - quoted values where the separator can be part of the value. - * ``escape`` (default ``\``): sets the single character used for escaping quotes \ - inside an already quoted value. - * ``header`` (default ``false``): writes the names of columns as the first line. - * ``nullValue`` (default empty string): sets the string representation of a null value. - * ``compression``: compression codec to use when saving to file. This can be one of \ - the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and \ - deflate). + :param sep: sets the single character as a separator for each field and value. If None is + set, it uses the default value, ``,``. + :param quote: sets the single character used for escaping quoted values where the + separator can be part of the value. If None is set, it uses the default + value, ``"``. + :param escape: sets the single character used for escaping quotes inside an already + quoted value. If None is set, it uses the default value, ``\`` + :param header: writes the names of columns as the first line. If None is set, it uses + the default value, ``false``. + :param nullValue: sets the string representation of a null value. If None is set, it uses + the default value, empty string. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) if compression is not None: self.option("compression", compression) + if sep is not None: + self.option("sep", sep) + if quote is not None: + self.option("quote", quote) + if escape is not None: + self.option("escape", escape) + if header is not None: + self.option("header", header) + if nullValue is not None: + self.option("nullValue", nullValue) self._jwrite.csv(path) @since(1.5) @@ -797,7 +875,7 @@ def _test(): globs['sqlContext'] = SQLContext(sc) globs['hiveContext'] = HiveContext._createForTesting(sc) globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned') - globs['sdf'] =\ + globs['sdf'] = \ globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( -- cgit v1.2.3