diff options
author | Yin Huai <huai@cse.ohio-state.edu> | 2014-06-17 19:14:59 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-17 19:14:59 -0700 |
commit | d2f4f30b12f99358953e2781957468e2cfe3c916 (patch) | |
tree | 405b949a2968dba2c73874bd2fefc9d10206e731 /python | |
parent | b2ebf429e24566c29850c570f8d76943151ad78c (diff) | |
download | spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2 spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip |
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060
Programming guide: http://yhuai.github.io/site/sql-programming-guide.html
Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext
Author: Yin Huai <huai@cse.ohio-state.edu>
Closes #999 from yhuai/newJson and squashes the following commits:
227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ce8eedd [Yin Huai] rxin's comments.
bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
94ffdaa [Yin Huai] Remove "get" from method names.
ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
79ea9ba [Yin Huai] Fix typos.
5428451 [Yin Huai] Newline
1f908ce [Yin Huai] Remove extra line.
d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
7ea750e [Yin Huai] marmbrus's comments.
6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
83013fb [Yin Huai] Update Java Example.
e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map.
6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4fbddf0 [Yin Huai] Programming guide.
9df8c5a [Yin Huai] Python API.
7027634 [Yin Huai] Java API.
cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset.
d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ab810b0 [Yin Huai] Make JsonRDD private.
6df0891 [Yin Huai] Apache header.
8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema.
8ffed79 [Yin Huai] Update the example.
a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution.
65b87f0 [Yin Huai] Fix sampling...
8846af5 [Yin Huai] API doc.
52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
0387523 [Yin Huai] Address PR comments.
666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
a2313a6 [Yin Huai] Address PR comments.
f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used.
0576406 [Yin Huai] Add Apache license header.
af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD.
f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql.py | 64 |
1 files changed, 62 insertions, 2 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index c31d49ce83..5051c82da3 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pyspark.rdd import RDD +from pyspark.rdd import RDD, PipelinedRDD from pyspark.serializers import BatchedSerializer, PickleSerializer from py4j.protocol import Py4JError @@ -137,6 +137,53 @@ class SQLContext: jschema_rdd = self._ssql_ctx.parquetFile(path) return SchemaRDD(jschema_rdd, self) + + def jsonFile(self, path): + """Loads a text file storing one JSON object per line, + returning the result as a L{SchemaRDD}. + It goes through the entire dataset once to determine the schema. + + >>> import tempfile, shutil + >>> jsonFile = tempfile.mkdtemp() + >>> shutil.rmtree(jsonFile) + >>> ofn = open(jsonFile, 'w') + >>> for json in jsonStrings: + ... print>>ofn, json + >>> ofn.close() + >>> srdd = sqlCtx.jsonFile(jsonFile) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") + >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, + ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, + ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + True + """ + jschema_rdd = self._ssql_ctx.jsonFile(path) + return SchemaRDD(jschema_rdd, self) + + def jsonRDD(self, rdd): + """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}. + It goes through the entire dataset once to determine the schema. + + >>> srdd = sqlCtx.jsonRDD(json) + >>> sqlCtx.registerRDDAsTable(srdd, "table1") + >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1") + >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}}, + ... {"f1": 2, "f2": "row2", "f3":{"field4":22}}, + ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}] + True + """ + def func(split, iterator): + for x in iterator: + if not isinstance(x, basestring): + x = unicode(x) + yield x.encode("utf-8") + keyed = PipelinedRDD(rdd, func) + keyed._bypass_serializer = True + jrdd = keyed._jrdd.map(self._jvm.BytesToString()) + jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd()) + return SchemaRDD(jschema_rdd, self) + def sql(self, sqlQuery): """Return a L{SchemaRDD} representing the result of the given query. @@ -265,7 +312,7 @@ class SchemaRDD(RDD): For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on directly, as it's underlying - implementation is a RDD composed of Java objects. Instead it is + implementation is an RDD composed of Java objects. Instead it is converted to a PythonRDD in the JVM, on which Python operations can be done. """ @@ -337,6 +384,14 @@ class SchemaRDD(RDD): """Creates a new table with the contents of this SchemaRDD.""" self._jschema_rdd.saveAsTable(tableName) + def schemaString(self): + """Returns the output schema in the tree format.""" + return self._jschema_rdd.schemaString() + + def printSchema(self): + """Prints out the schema in the tree format.""" + print self.schemaString() + def count(self): """Return the number of elements in this RDD. @@ -436,6 +491,11 @@ def _test(): globs['sqlCtx'] = SQLContext(sc) globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}', + '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}', + '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}'] + globs['jsonStrings'] = jsonStrings + globs['json'] = sc.parallelize(jsonStrings) globs['nestedRdd1'] = sc.parallelize([ {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) |