aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/readwriter.py13
-rw-r--r--python/pyspark/sql/streaming.py14
-rw-r--r--python/pyspark/sql/tests.py7
-rw-r--r--python/test_support/sql/people_array.json13
4 files changed, 37 insertions, 10 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1678334889..6bed390e60 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -159,11 +159,12 @@ class DataFrameReader(OptionUtils):
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
- timeZone=None):
+ timeZone=None, wholeFile=None):
"""
- Loads a JSON file (`JSON Lines text format or newline-delimited JSON
- <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
- record) and returns the result as a :class`DataFrame`.
+ Loads a JSON file and returns the results as a :class:`DataFrame`.
+
+ Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_
+ (newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
@@ -212,6 +213,8 @@ class DataFrameReader(OptionUtils):
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
+ :param wholeFile: parse one record, which may span multiple lines, per file. If None is
+ set, it uses the default value, ``false``.
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -228,7 +231,7 @@ class DataFrameReader(OptionUtils):
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat, timeZone=timeZone)
+ timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index d988e596a8..965c8f6b26 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -428,11 +428,13 @@ class DataStreamReader(OptionUtils):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
- mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
- timestampFormat=None, timeZone=None):
+ mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
+ timeZone=None, wholeFile=None):
"""
- Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
- <http://jsonlines.org/>`_) and returns a :class`DataFrame`.
+ Loads a JSON file stream and returns the results as a :class:`DataFrame`.
+
+ Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_
+ (newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
@@ -483,6 +485,8 @@ class DataStreamReader(OptionUtils):
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.
+ :param wholeFile: parse one record, which may span multiple lines, per file. If None is
+ set, it uses the default value, ``false``.
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -496,7 +500,7 @@ class DataStreamReader(OptionUtils):
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
- timestampFormat=timestampFormat, timeZone=timeZone)
+ timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d8b7b3137c..9058443285 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -439,6 +439,13 @@ class SQLTests(ReusedPySparkTestCase):
res.explain(True)
self.assertEqual(res.collect(), [Row(id=0, copy=0)])
+ def test_wholefile_json(self):
+ from pyspark.sql.types import StringType
+ people1 = self.spark.read.json("python/test_support/sql/people.json")
+ people_array = self.spark.read.json("python/test_support/sql/people_array.json",
+ wholeFile=True)
+ self.assertEqual(people1.collect(), people_array.collect())
+
def test_udf_with_input_file_name(self):
from pyspark.sql.functions import udf, input_file_name
from pyspark.sql.types import StringType
diff --git a/python/test_support/sql/people_array.json b/python/test_support/sql/people_array.json
new file mode 100644
index 0000000000..c27c48fe34
--- /dev/null
+++ b/python/test_support/sql/people_array.json
@@ -0,0 +1,13 @@
+[
+ {
+ "name": "Michael"
+ },
+ {
+ "name": "Andy",
+ "age": 30
+ },
+ {
+ "name": "Justin",
+ "age": 19
+ }
+]