diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql/readwriter.py | 13 | ||||
-rw-r--r-- | python/pyspark/sql/streaming.py | 14 | ||||
-rw-r--r-- | python/pyspark/sql/tests.py | 7 | ||||
-rw-r--r-- | python/test_support/sql/people_array.json | 13 |
4 files changed, 37 insertions, 10 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1678334889..6bed390e60 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -159,11 +159,12 @@ class DataFrameReader(OptionUtils): allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, - timeZone=None): + timeZone=None, wholeFile=None): """ - Loads a JSON file (`JSON Lines text format or newline-delimited JSON - <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per - record) and returns the result as a :class`DataFrame`. + Loads a JSON file and returns the results as a :class:`DataFrame`. + + Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_ + (newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter. If the ``schema`` parameter is not specified, this function goes through the input once to determine the input schema. @@ -212,6 +213,8 @@ class DataFrameReader(OptionUtils): default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. + :param wholeFile: parse one record, which may span multiple lines, per file. If None is + set, it uses the default value, ``false``. >>> df1 = spark.read.json('python/test_support/sql/people.json') >>> df1.dtypes @@ -228,7 +231,7 @@ class DataFrameReader(OptionUtils): allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat, timeZone=timeZone) + timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile) if isinstance(path, basestring): path = [path] if type(path) == list: diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d988e596a8..965c8f6b26 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -428,11 +428,13 @@ class DataStreamReader(OptionUtils): def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None, dateFormat=None, - timestampFormat=None, timeZone=None): + mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, + timeZone=None, wholeFile=None): """ - Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON - <http://jsonlines.org/>`_) and returns a :class`DataFrame`. + Loads a JSON file stream and returns the results as a :class:`DataFrame`. + + Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_ + (newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter. If the ``schema`` parameter is not specified, this function goes through the input once to determine the input schema. @@ -483,6 +485,8 @@ class DataStreamReader(OptionUtils): default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. If None is set, it uses the default value, session local timezone. + :param wholeFile: parse one record, which may span multiple lines, per file. If None is + set, it uses the default value, ``false``. >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema) >>> json_sdf.isStreaming @@ -496,7 +500,7 @@ class DataStreamReader(OptionUtils): allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat, timeZone=timeZone) + timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d8b7b3137c..9058443285 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -439,6 +439,13 @@ class SQLTests(ReusedPySparkTestCase): res.explain(True) self.assertEqual(res.collect(), [Row(id=0, copy=0)]) + def test_wholefile_json(self): + from pyspark.sql.types import StringType + people1 = self.spark.read.json("python/test_support/sql/people.json") + people_array = self.spark.read.json("python/test_support/sql/people_array.json", + wholeFile=True) + self.assertEqual(people1.collect(), people_array.collect()) + def test_udf_with_input_file_name(self): from pyspark.sql.functions import udf, input_file_name from pyspark.sql.types import StringType diff --git a/python/test_support/sql/people_array.json b/python/test_support/sql/people_array.json new file mode 100644 index 0000000000..c27c48fe34 --- /dev/null +++ b/python/test_support/sql/people_array.json @@ -0,0 +1,13 @@ +[ + { + "name": "Michael" + }, + { + "name": "Andy", + "age": 30 + }, + { + "name": "Justin", + "age": 19 + } +] |