aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-06-17 19:14:59 -0700
committerReynold Xin <rxin@apache.org>2014-06-17 19:14:59 -0700
commitd2f4f30b12f99358953e2781957468e2cfe3c916 (patch)
tree405b949a2968dba2c73874bd2fefc9d10206e731 /python
parentb2ebf429e24566c29850c570f8d76943151ad78c (diff)
downloadspark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.gz
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.tar.bz2
spark-d2f4f30b12f99358953e2781957468e2cfe3c916.zip
[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL
JIRA: https://issues.apache.org/jira/browse/SPARK-2060 Programming guide: http://yhuai.github.io/site/sql-programming-guide.html Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext Author: Yin Huai <huai@cse.ohio-state.edu> Closes #999 from yhuai/newJson and squashes the following commits: 227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ce8eedd [Yin Huai] rxin's comments. bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 94ffdaa [Yin Huai] Remove "get" from method names. ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 79ea9ba [Yin Huai] Fix typos. 5428451 [Yin Huai] Newline 1f908ce [Yin Huai] Remove extra line. d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 7ea750e [Yin Huai] marmbrus's comments. 6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 83013fb [Yin Huai] Update Java Example. e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map. 6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4fbddf0 [Yin Huai] Programming guide. 9df8c5a [Yin Huai] Python API. 7027634 [Yin Huai] Java API. cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset. d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson ab810b0 [Yin Huai] Make JsonRDD private. 6df0891 [Yin Huai] Apache header. 8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema. 8ffed79 [Yin Huai] Update the example. a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution. 65b87f0 [Yin Huai] Fix sampling... 8846af5 [Yin Huai] API doc. 52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson 0387523 [Yin Huai] Address PR comments. 666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson a2313a6 [Yin Huai] Address PR comments. f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used. 0576406 [Yin Huai] Add Apache license header. af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD. f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py64
1 files changed, 62 insertions, 2 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index c31d49ce83..5051c82da3 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, PipelinedRDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from py4j.protocol import Py4JError
@@ -137,6 +137,53 @@ class SQLContext:
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
+
+ def jsonFile(self, path):
+ """Loads a text file storing one JSON object per line,
+ returning the result as a L{SchemaRDD}.
+ It goes through the entire dataset once to determine the schema.
+
+ >>> import tempfile, shutil
+ >>> jsonFile = tempfile.mkdtemp()
+ >>> shutil.rmtree(jsonFile)
+ >>> ofn = open(jsonFile, 'w')
+ >>> for json in jsonStrings:
+ ... print>>ofn, json
+ >>> ofn.close()
+ >>> srdd = sqlCtx.jsonFile(jsonFile)
+ >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+ >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+ >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+ ... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+ ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+ True
+ """
+ jschema_rdd = self._ssql_ctx.jsonFile(path)
+ return SchemaRDD(jschema_rdd, self)
+
+ def jsonRDD(self, rdd):
+ """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
+ It goes through the entire dataset once to determine the schema.
+
+ >>> srdd = sqlCtx.jsonRDD(json)
+ >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+ >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
+ >>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
+ ... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
+ ... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
+ True
+ """
+ def func(split, iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ yield x.encode("utf-8")
+ keyed = PipelinedRDD(rdd, func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._jvm.BytesToString())
+ jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
+ return SchemaRDD(jschema_rdd, self)
+
def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.
@@ -265,7 +312,7 @@ class SchemaRDD(RDD):
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
- implementation is a RDD composed of Java objects. Instead it is
+ implementation is an RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""
@@ -337,6 +384,14 @@ class SchemaRDD(RDD):
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)
+ def schemaString(self):
+ """Returns the output schema in the tree format."""
+ return self._jschema_rdd.schemaString()
+
+ def printSchema(self):
+ """Prints out the schema in the tree format."""
+ print self.schemaString()
+
def count(self):
"""Return the number of elements in this RDD.
@@ -436,6 +491,11 @@ def _test():
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
+ jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
+ '{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
+ '{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
+ globs['jsonStrings'] = jsonStrings
+ globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])