aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDan McClary <dan.mcclary@gmail.com>2014-11-20 13:36:50 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-20 13:44:55 -0800
commit21f582f12b4d00017b990bcc232dcbf546b5dbe7 (patch)
tree429465f5a0544f9889f4103244f037bfff5252c3 /python
parent2fb683c585d8f30a7b19027b941812c922e7d99a (diff)
downloadspark-21f582f12b4d00017b990bcc232dcbf546b5dbe7.tar.gz
spark-21f582f12b4d00017b990bcc232dcbf546b5dbe7.tar.bz2
spark-21f582f12b4d00017b990bcc232dcbf546b5dbe7.zip
[SPARK-4228][SQL] SchemaRDD to JSON
Here's a simple fix for SchemaRDD to JSON. Author: Dan McClary <dan.mcclary@gmail.com> Closes #3213 from dwmclary/SPARK-4228 and squashes the following commits: d714e1d [Dan McClary] fixed PEP 8 error cac2879 [Dan McClary] move pyspark comment and doctest to correct location f9471d3 [Dan McClary] added pyspark doc and doctest 6598cee [Dan McClary] adding complex type queries 1a5fd30 [Dan McClary] removing SPARK-4228 from SQLQuerySuite 4a651f0 [Dan McClary] cleaned PEP and Scala style failures. Moved tests to JsonSuite 47ceff6 [Dan McClary] cleaned up scala style issues 2ee1e70 [Dan McClary] moved rowToJSON to JsonRDD 4387dd5 [Dan McClary] Added UserDefinedType, cleaned up case formatting 8f7bfb6 [Dan McClary] Map type added to SchemaRDD.toJSON 1b11980 [Dan McClary] Map and UserDefinedTypes partially done 11d2016 [Dan McClary] formatting and unicode deserialization default fixed 6af72d1 [Dan McClary] deleted extaneous comment 4d11c0c [Dan McClary] JsonFactory rewrite of toJSON for SchemaRDD 149dafd [Dan McClary] wrapped scala toJSON in sql.py 5e5eb1b [Dan McClary] switched to Jackson for JSON processing 6c94a54 [Dan McClary] added toJSON to pyspark SchemaRDD aaeba58 [Dan McClary] added toJSON to pyspark SchemaRDD 1d171aa [Dan McClary] upated missing brace on if statement 319e3ba [Dan McClary] updated to upstream master with merged SPARK-4228 424f130 [Dan McClary] tests pass, ready for pull and PR 626a5b1 [Dan McClary] added toJSON to SchemaRDD f7d166a [Dan McClary] added toJSON method 5d34e37 [Dan McClary] merge resolved d6d19e9 [Dan McClary] pr example (cherry picked from commit b8e6886fb8ff8f667fb7e600cd727d8649cad1d1) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py17
1 files changed, 16 insertions, 1 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index e5d62a466c..abb284d1e3 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -45,7 +45,7 @@ from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
- CloudPickleSerializer
+ CloudPickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1870,6 +1870,21 @@ class SchemaRDD(RDD):
rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
return SchemaRDD(rdd, self.sql_ctx)
+ def toJSON(self, use_unicode=False):
+ """Convert a SchemaRDD into a MappedRDD of JSON documents; one document per row.
+
+ >>> srdd1 = sqlCtx.jsonRDD(json)
+ >>> sqlCtx.registerRDDAsTable(srdd1, "table1")
+ >>> srdd2 = sqlCtx.sql( "SELECT * from table1")
+ >>> srdd2.toJSON().take(1)[0] == '{"field1":1,"field2":"row1","field3":{"field4":11}}'
+ True
+ >>> srdd3 = sqlCtx.sql( "SELECT field3.field4 from table1")
+ >>> srdd3.toJSON().collect() == ['{"field4":11}', '{"field4":22}', '{"field4":33}']
+ True
+ """
+ rdd = self._jschema_rdd.baseSchemaRDD().toJSON()
+ return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
+
def saveAsParquetFile(self, path):
"""Save the contents as a Parquet file, preserving the schema.