diff options
author | Davies Liu <davies@databricks.com> | 2015-02-24 20:51:55 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-24 20:51:55 -0800 |
commit | d641fbb39c90b1d734cc55396ca43d7e98788975 (patch) | |
tree | d9741f7c08d6ba288b224e8c2af60fc1bdb445f3 /python/pyspark/sql/context.py | |
parent | 769e092bdc51582372093f76dbaece27149cc4ea (diff) | |
download | spark-d641fbb39c90b1d734cc55396ca43d7e98788975.tar.gz spark-d641fbb39c90b1d734cc55396ca43d7e98788975.tar.bz2 spark-d641fbb39c90b1d734cc55396ca43d7e98788975.zip |
[SPARK-5994] [SQL] Python DataFrame documentation fixes
select empty should NOT be the same as select. make sure selectExpr is behaving the same.
join param documentation
link to source doesn't work in jekyll generated file
cross reference of columns (i.e. enabling linking)
show(): move df example before df.show()
move tests in SQLContext out of docstring otherwise doc is too long
Column.desc and .asc doesn't have any documentation
in documentation, sort functions.*)
Author: Davies Liu <davies@databricks.com>
Closes #4756 from davies/df_docs and squashes the following commits:
f30502c [Davies Liu] fix doc
32f0d46 [Davies Liu] fix DataFrame docs
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r-- | python/pyspark/sql/context.py | 182 |
1 files changed, 34 insertions, 148 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 125933c9d3..5d7aeb664c 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -129,6 +129,7 @@ class SQLContext(object): >>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x)) >>> sqlCtx.sql("SELECT stringLengthString('test')").collect() [Row(c0=u'4')] + >>> from pyspark.sql.types import IntegerType >>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect() @@ -197,31 +198,6 @@ class SQLContext(object): >>> df = sqlCtx.inferSchema(rdd) >>> df.collect()[0] Row(field1=1, field2=u'row1') - - >>> NestedRow = Row("f1", "f2") - >>> nestedRdd1 = sc.parallelize([ - ... NestedRow(array('i', [1, 2]), {"row1": 1.0}), - ... NestedRow(array('i', [2, 3]), {"row2": 2.0})]) - >>> df = sqlCtx.inferSchema(nestedRdd1) - >>> df.collect() - [Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})] - - >>> nestedRdd2 = sc.parallelize([ - ... NestedRow([[1, 2], [2, 3]], [1, 2]), - ... NestedRow([[2, 3], [3, 4]], [2, 3])]) - >>> df = sqlCtx.inferSchema(nestedRdd2) - >>> df.collect() - [Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])] - - >>> from collections import namedtuple - >>> CustomRow = namedtuple('CustomRow', 'field1 field2') - >>> rdd = sc.parallelize( - ... [CustomRow(field1=1, field2="row1"), - ... CustomRow(field1=2, field2="row2"), - ... CustomRow(field1=3, field2="row3")]) - >>> df = sqlCtx.inferSchema(rdd) - >>> df.collect()[0] - Row(field1=1, field2=u'row1') """ if isinstance(rdd, DataFrame): @@ -252,56 +228,8 @@ class SQLContext(object): >>> schema = StructType([StructField("field1", IntegerType(), False), ... StructField("field2", StringType(), False)]) >>> df = sqlCtx.applySchema(rdd2, schema) - >>> sqlCtx.registerDataFrameAsTable(df, "table1") - >>> df2 = sqlCtx.sql("SELECT * from table1") - >>> df2.collect() - [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')] - - >>> from datetime import date, datetime - >>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0, - ... date(2010, 1, 1), - ... datetime(2010, 1, 1, 1, 1, 1), - ... {"a": 1}, (2,), [1, 2, 3], None)]) - >>> schema = StructType([ - ... StructField("byte1", ByteType(), False), - ... StructField("byte2", ByteType(), False), - ... StructField("short1", ShortType(), False), - ... StructField("short2", ShortType(), False), - ... StructField("int1", IntegerType(), False), - ... StructField("float1", FloatType(), False), - ... StructField("date1", DateType(), False), - ... StructField("time1", TimestampType(), False), - ... StructField("map1", - ... MapType(StringType(), IntegerType(), False), False), - ... StructField("struct1", - ... StructType([StructField("b", ShortType(), False)]), False), - ... StructField("list1", ArrayType(ByteType(), False), False), - ... StructField("null1", DoubleType(), True)]) - >>> df = sqlCtx.applySchema(rdd, schema) - >>> results = df.map( - ... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1, - ... x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1)) - >>> results.collect()[0] # doctest: +NORMALIZE_WHITESPACE - (127, -128, -32768, 32767, 2147483647, 1.0, datetime.date(2010, 1, 1), - datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None) - - >>> df.registerTempTable("table2") - >>> sqlCtx.sql( - ... "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " + - ... "short1 + 1 AS short1, short2 - 1 AS short2, int1 - 1 AS int1, " + - ... "float1 + 1.5 as float1 FROM table2").collect() - [Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int1=2147483646, float1=2.5)] - - >>> from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type - >>> rdd = sc.parallelize([(127, -32768, 1.0, - ... datetime(2010, 1, 1, 1, 1, 1), - ... {"a": 1}, (2,), [1, 2, 3])]) - >>> abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]" - >>> schema = _parse_schema_abstract(abstract) - >>> typedSchema = _infer_schema_type(rdd.first(), schema) - >>> df = sqlCtx.applySchema(rdd, typedSchema) >>> df.collect() - [Row(byte1=127, short1=-32768, float1=1.0, time1=..., list1=[1, 2, 3])] + [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')] """ if isinstance(rdd, DataFrame): @@ -459,46 +387,28 @@ class SQLContext(object): >>> import tempfile, shutil >>> jsonFile = tempfile.mkdtemp() >>> shutil.rmtree(jsonFile) - >>> ofn = open(jsonFile, 'w') - >>> for json in jsonStrings: - ... print>>ofn, json - >>> ofn.close() + >>> with open(jsonFile, 'w') as f: + ... f.writelines(jsonStrings) >>> df1 = sqlCtx.jsonFile(jsonFile) - >>> sqlCtx.registerDataFrameAsTable(df1, "table1") - >>> df2 = sqlCtx.sql( - ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " - ... "field6 as f4 from table1") - >>> for r in df2.collect(): - ... print r - Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) - Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')]) - Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) - - >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema) - >>> sqlCtx.registerDataFrameAsTable(df3, "table2") - >>> df4 = sqlCtx.sql( - ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " - ... "field6 as f4 from table2") - >>> for r in df4.collect(): - ... print r - Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) - Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')]) - Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) + >>> df1.printSchema() + root + |-- field1: long (nullable = true) + |-- field2: string (nullable = true) + |-- field3: struct (nullable = true) + | |-- field4: long (nullable = true) >>> from pyspark.sql.types import * >>> schema = StructType([ - ... StructField("field2", StringType(), True), + ... StructField("field2", StringType()), ... StructField("field3", - ... StructType([ - ... StructField("field5", - ... ArrayType(IntegerType(), False), True)]), False)]) - >>> df5 = sqlCtx.jsonFile(jsonFile, schema) - >>> sqlCtx.registerDataFrameAsTable(df5, "table3") - >>> df6 = sqlCtx.sql( - ... "SELECT field2 AS f1, field3.field5 as f2, " - ... "field3.field5[0] as f3 from table3") - >>> df6.collect() - [Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)] + ... StructType([StructField("field5", ArrayType(IntegerType()))]))]) + >>> df2 = sqlCtx.jsonFile(jsonFile, schema) + >>> df2.printSchema() + root + |-- field2: string (nullable = true) + |-- field3: struct (nullable = true) + | |-- field5: array (nullable = true) + | | |-- element: integer (containsNull = true) """ if schema is None: df = self._ssql_ctx.jsonFile(path, samplingRatio) @@ -517,48 +427,23 @@ class SQLContext(object): determine the schema. >>> df1 = sqlCtx.jsonRDD(json) - >>> sqlCtx.registerDataFrameAsTable(df1, "table1") - >>> df2 = sqlCtx.sql( - ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " - ... "field6 as f4 from table1") - >>> for r in df2.collect(): - ... print r - Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) - Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')]) - Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) - - >>> df3 = sqlCtx.jsonRDD(json, df1.schema) - >>> sqlCtx.registerDataFrameAsTable(df3, "table2") - >>> df4 = sqlCtx.sql( - ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " - ... "field6 as f4 from table2") - >>> for r in df4.collect(): - ... print r - Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) - Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')]) - Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) + >>> df1.first() + Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None) + + >>> df2 = sqlCtx.jsonRDD(json, df1.schema) + >>> df2.first() + Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None) >>> from pyspark.sql.types import * >>> schema = StructType([ - ... StructField("field2", StringType(), True), + ... StructField("field2", StringType()), ... StructField("field3", - ... StructType([ - ... StructField("field5", - ... ArrayType(IntegerType(), False), True)]), False)]) - >>> df5 = sqlCtx.jsonRDD(json, schema) - >>> sqlCtx.registerDataFrameAsTable(df5, "table3") - >>> df6 = sqlCtx.sql( - ... "SELECT field2 AS f1, field3.field5 as f2, " - ... "field3.field5[0] as f3 from table3") - >>> df6.collect() - [Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)] - - >>> sqlCtx.jsonRDD(sc.parallelize(['{}', - ... '{"key0": {"key1": "value1"}}'])).collect() - [Row(key0=None), Row(key0=Row(key1=u'value1'))] - >>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', - ... '{"key0": {"key1": "value1"}}'])).collect() - [Row(key0=None), Row(key0=Row(key1=u'value1'))] + ... StructType([StructField("field5", ArrayType(IntegerType()))])) + ... ]) + >>> df3 = sqlCtx.jsonRDD(json, schema) + >>> df3.first() + Row(field2=u'row1', field3=Row(field5=None)) + """ def func(iterator): @@ -848,7 +733,8 @@ def _test(): globs['jsonStrings'] = jsonStrings globs['json'] = sc.parallelize(jsonStrings) (failure_count, test_count) = doctest.testmod( - pyspark.sql.context, globs=globs, optionflags=doctest.ELLIPSIS) + pyspark.sql.context, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: exit(-1) |