diff options
Diffstat (limited to 'python/pyspark/sql.py')
-rw-r--r-- | python/pyspark/sql.py | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e5d62a466c..abb284d1e3 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -45,7 +45,7 @@ from py4j.java_collections import ListConverter, MapConverter from pyspark.rdd import RDD from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \ - CloudPickleSerializer + CloudPickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync @@ -1870,6 +1870,21 @@ class SchemaRDD(RDD): rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() return SchemaRDD(rdd, self.sql_ctx) + def toJSON(self, use_unicode=False): + """Convert a SchemaRDD into a MappedRDD of JSON documents; one document per row. + + >>> srdd1 = sqlCtx.jsonRDD(json) + >>> sqlCtx.registerRDDAsTable(srdd1, "table1") + >>> srdd2 = sqlCtx.sql( "SELECT * from table1") + >>> srdd2.toJSON().take(1)[0] == '{"field1":1,"field2":"row1","field3":{"field4":11}}' + True + >>> srdd3 = sqlCtx.sql( "SELECT field3.field4 from table1") + >>> srdd3.toJSON().collect() == ['{"field4":11}', '{"field4":22}', '{"field4":33}'] + True + """ + rdd = self._jschema_rdd.baseSchemaRDD().toJSON() + return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) + def saveAsParquetFile(self, path): """Save the contents as a Parquet file, preserving the schema. |