aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/readwriter.py
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-08-26 22:19:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-26 22:19:11 -0700
commitce97834dc0cc55eece0e909a4061ca6f2123f60d (patch)
tree4ae36226fa30cf275654aaa8d9a4b663dbf7f5b7 /python/pyspark/sql/readwriter.py
parent0fac144f6bd835395059154532d72cdb5dc7ef8d (diff)
downloadspark-ce97834dc0cc55eece0e909a4061ca6f2123f60d.tar.gz
spark-ce97834dc0cc55eece0e909a4061ca6f2123f60d.tar.bz2
spark-ce97834dc0cc55eece0e909a4061ca6f2123f60d.zip
[SPARK-9964] [PYSPARK] [SQL] PySpark DataFrameReader accept RDD of String for JSON
PySpark DataFrameReader should could accept an RDD of Strings (like the Scala version does) for JSON, rather than only taking a path. If this PR is merged, it should be duplicated to cover the other input types (not just JSON). Author: Yanbo Liang <ybliang8@gmail.com> Closes #8444 from yanboliang/spark-9964.
Diffstat (limited to 'python/pyspark/sql/readwriter.py')
-rw-r--r--python/pyspark/sql/readwriter.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 78247c8fa7..3fa6895880 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -15,8 +15,14 @@
# limitations under the License.
#
+import sys
+
+if sys.version >= '3':
+ basestring = unicode = str
+
from py4j.java_gateway import JavaClass
+from pyspark import RDD
from pyspark.sql import since
from pyspark.sql.column import _to_seq
from pyspark.sql.types import *
@@ -125,23 +131,33 @@ class DataFrameReader(object):
@since(1.4)
def json(self, path, schema=None):
"""
- Loads a JSON file (one object per line) and returns the result as
- a :class`DataFrame`.
+ Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
+ (one object per record) and returns the result as a :class`DataFrame`.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
- :param path: string, path to the JSON dataset.
+ :param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
- >>> df = sqlContext.read.json('python/test_support/sql/people.json')
- >>> df.dtypes
+ >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
+ >>> df1.dtypes
+ [('age', 'bigint'), ('name', 'string')]
+ >>> rdd = sc.textFile('python/test_support/sql/people.json')
+ >>> df2 = sqlContext.read.json(rdd)
+ >>> df2.dtypes
[('age', 'bigint'), ('name', 'string')]
"""
if schema is not None:
self.schema(schema)
- return self._df(self._jreader.json(path))
+ if isinstance(path, basestring):
+ return self._df(self._jreader.json(path))
+ elif isinstance(path, RDD):
+ return self._df(self._jreader.json(path._jrdd))
+ else:
+ raise TypeError("path can be only string or RDD")
@since(1.4)
def table(self, tableName):