aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py17
1 files changed, 16 insertions, 1 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index e5d62a466c..abb284d1e3 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -45,7 +45,7 @@ from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
- CloudPickleSerializer
+ CloudPickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1870,6 +1870,21 @@ class SchemaRDD(RDD):
rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
return SchemaRDD(rdd, self.sql_ctx)
+ def toJSON(self, use_unicode=False):
+ """Convert a SchemaRDD into a MappedRDD of JSON documents; one document per row.
+
+ >>> srdd1 = sqlCtx.jsonRDD(json)
+ >>> sqlCtx.registerRDDAsTable(srdd1, "table1")
+ >>> srdd2 = sqlCtx.sql( "SELECT * from table1")
+ >>> srdd2.toJSON().take(1)[0] == '{"field1":1,"field2":"row1","field3":{"field4":11}}'
+ True
+ >>> srdd3 = sqlCtx.sql( "SELECT field3.field4 from table1")
+ >>> srdd3.toJSON().collect() == ['{"field4":11}', '{"field4":22}', '{"field4":33}']
+ True
+ """
+ rdd = self._jschema_rdd.baseSchemaRDD().toJSON()
+ return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
+
def saveAsParquetFile(self, path):
"""Save the contents as a Parquet file, preserving the schema.