diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/sql.py | 22 |
1 files changed, 21 insertions, 1 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index e344610b1f..c31d49ce83 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -77,12 +77,25 @@ class SQLContext: """Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to determine the fields names - and types, and then use that to extract all the dictionaries. + and types, and then use that to extract all the dictionaries. Nested + collections are supported, which include array, dict, list, set, and + tuple. >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, ... {"field1" : 3, "field2": "row3"}] True + + >>> from array import array + >>> srdd = sqlCtx.inferSchema(nestedRdd1) + >>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, + ... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}] + True + + >>> srdd = sqlCtx.inferSchema(nestedRdd2) + >>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, + ... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}] + True """ if (rdd.__class__ is SchemaRDD): raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__) @@ -413,6 +426,7 @@ class SchemaRDD(RDD): def _test(): import doctest + from array import array from pyspark.context import SparkContext globs = globals().copy() # The small batch size here ensures that we see multiple batches, @@ -422,6 +436,12 @@ def _test(): globs['sqlCtx'] = SQLContext(sc) globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}]) + globs['nestedRdd1'] = sc.parallelize([ + {"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}}, + {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]) + globs['nestedRdd2'] = sc.parallelize([ + {"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)}, + {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]) (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: |