aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-24 20:51:55 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-24 20:51:55 -0800
commitd641fbb39c90b1d734cc55396ca43d7e98788975 (patch)
treed9741f7c08d6ba288b224e8c2af60fc1bdb445f3 /python/pyspark/sql/context.py
parent769e092bdc51582372093f76dbaece27149cc4ea (diff)
downloadspark-d641fbb39c90b1d734cc55396ca43d7e98788975.tar.gz
spark-d641fbb39c90b1d734cc55396ca43d7e98788975.tar.bz2
spark-d641fbb39c90b1d734cc55396ca43d7e98788975.zip
[SPARK-5994] [SQL] Python DataFrame documentation fixes
select empty should NOT be the same as select. make sure selectExpr is behaving the same. join param documentation link to source doesn't work in jekyll generated file cross reference of columns (i.e. enabling linking) show(): move df example before df.show() move tests in SQLContext out of docstring otherwise doc is too long Column.desc and .asc doesn't have any documentation in documentation, sort functions.*) Author: Davies Liu <davies@databricks.com> Closes #4756 from davies/df_docs and squashes the following commits: f30502c [Davies Liu] fix doc 32f0d46 [Davies Liu] fix DataFrame docs
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py182
1 files changed, 34 insertions, 148 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 125933c9d3..5d7aeb664c 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -129,6 +129,7 @@ class SQLContext(object):
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
+
>>> from pyspark.sql.types import IntegerType
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
@@ -197,31 +198,6 @@ class SQLContext(object):
>>> df = sqlCtx.inferSchema(rdd)
>>> df.collect()[0]
Row(field1=1, field2=u'row1')
-
- >>> NestedRow = Row("f1", "f2")
- >>> nestedRdd1 = sc.parallelize([
- ... NestedRow(array('i', [1, 2]), {"row1": 1.0}),
- ... NestedRow(array('i', [2, 3]), {"row2": 2.0})])
- >>> df = sqlCtx.inferSchema(nestedRdd1)
- >>> df.collect()
- [Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})]
-
- >>> nestedRdd2 = sc.parallelize([
- ... NestedRow([[1, 2], [2, 3]], [1, 2]),
- ... NestedRow([[2, 3], [3, 4]], [2, 3])])
- >>> df = sqlCtx.inferSchema(nestedRdd2)
- >>> df.collect()
- [Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])]
-
- >>> from collections import namedtuple
- >>> CustomRow = namedtuple('CustomRow', 'field1 field2')
- >>> rdd = sc.parallelize(
- ... [CustomRow(field1=1, field2="row1"),
- ... CustomRow(field1=2, field2="row2"),
- ... CustomRow(field1=3, field2="row3")])
- >>> df = sqlCtx.inferSchema(rdd)
- >>> df.collect()[0]
- Row(field1=1, field2=u'row1')
"""
if isinstance(rdd, DataFrame):
@@ -252,56 +228,8 @@ class SQLContext(object):
>>> schema = StructType([StructField("field1", IntegerType(), False),
... StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
- >>> sqlCtx.registerDataFrameAsTable(df, "table1")
- >>> df2 = sqlCtx.sql("SELECT * from table1")
- >>> df2.collect()
- [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
-
- >>> from datetime import date, datetime
- >>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0,
- ... date(2010, 1, 1),
- ... datetime(2010, 1, 1, 1, 1, 1),
- ... {"a": 1}, (2,), [1, 2, 3], None)])
- >>> schema = StructType([
- ... StructField("byte1", ByteType(), False),
- ... StructField("byte2", ByteType(), False),
- ... StructField("short1", ShortType(), False),
- ... StructField("short2", ShortType(), False),
- ... StructField("int1", IntegerType(), False),
- ... StructField("float1", FloatType(), False),
- ... StructField("date1", DateType(), False),
- ... StructField("time1", TimestampType(), False),
- ... StructField("map1",
- ... MapType(StringType(), IntegerType(), False), False),
- ... StructField("struct1",
- ... StructType([StructField("b", ShortType(), False)]), False),
- ... StructField("list1", ArrayType(ByteType(), False), False),
- ... StructField("null1", DoubleType(), True)])
- >>> df = sqlCtx.applySchema(rdd, schema)
- >>> results = df.map(
- ... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int1, x.float1, x.date1,
- ... x.time1, x.map1["a"], x.struct1.b, x.list1, x.null1))
- >>> results.collect()[0] # doctest: +NORMALIZE_WHITESPACE
- (127, -128, -32768, 32767, 2147483647, 1.0, datetime.date(2010, 1, 1),
- datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
-
- >>> df.registerTempTable("table2")
- >>> sqlCtx.sql(
- ... "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " +
- ... "short1 + 1 AS short1, short2 - 1 AS short2, int1 - 1 AS int1, " +
- ... "float1 + 1.5 as float1 FROM table2").collect()
- [Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int1=2147483646, float1=2.5)]
-
- >>> from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type
- >>> rdd = sc.parallelize([(127, -32768, 1.0,
- ... datetime(2010, 1, 1, 1, 1, 1),
- ... {"a": 1}, (2,), [1, 2, 3])])
- >>> abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
- >>> schema = _parse_schema_abstract(abstract)
- >>> typedSchema = _infer_schema_type(rdd.first(), schema)
- >>> df = sqlCtx.applySchema(rdd, typedSchema)
>>> df.collect()
- [Row(byte1=127, short1=-32768, float1=1.0, time1=..., list1=[1, 2, 3])]
+ [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
"""
if isinstance(rdd, DataFrame):
@@ -459,46 +387,28 @@ class SQLContext(object):
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
- >>> ofn = open(jsonFile, 'w')
- >>> for json in jsonStrings:
- ... print>>ofn, json
- >>> ofn.close()
+ >>> with open(jsonFile, 'w') as f:
+ ... f.writelines(jsonStrings)
>>> df1 = sqlCtx.jsonFile(jsonFile)
- >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
- >>> df2 = sqlCtx.sql(
- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
- ... "field6 as f4 from table1")
- >>> for r in df2.collect():
- ... print r
- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
- Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
-
- >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
- >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
- >>> df4 = sqlCtx.sql(
- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
- ... "field6 as f4 from table2")
- >>> for r in df4.collect():
- ... print r
- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
- Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
+ >>> df1.printSchema()
+ root
+ |-- field1: long (nullable = true)
+ |-- field2: string (nullable = true)
+ |-- field3: struct (nullable = true)
+ | |-- field4: long (nullable = true)
>>> from pyspark.sql.types import *
>>> schema = StructType([
- ... StructField("field2", StringType(), True),
+ ... StructField("field2", StringType()),
... StructField("field3",
- ... StructType([
- ... StructField("field5",
- ... ArrayType(IntegerType(), False), True)]), False)])
- >>> df5 = sqlCtx.jsonFile(jsonFile, schema)
- >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
- >>> df6 = sqlCtx.sql(
- ... "SELECT field2 AS f1, field3.field5 as f2, "
- ... "field3.field5[0] as f3 from table3")
- >>> df6.collect()
- [Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)]
+ ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
+ >>> df2 = sqlCtx.jsonFile(jsonFile, schema)
+ >>> df2.printSchema()
+ root
+ |-- field2: string (nullable = true)
+ |-- field3: struct (nullable = true)
+ | |-- field5: array (nullable = true)
+ | | |-- element: integer (containsNull = true)
"""
if schema is None:
df = self._ssql_ctx.jsonFile(path, samplingRatio)
@@ -517,48 +427,23 @@ class SQLContext(object):
determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
- >>> sqlCtx.registerDataFrameAsTable(df1, "table1")
- >>> df2 = sqlCtx.sql(
- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
- ... "field6 as f4 from table1")
- >>> for r in df2.collect():
- ... print r
- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
- Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
-
- >>> df3 = sqlCtx.jsonRDD(json, df1.schema)
- >>> sqlCtx.registerDataFrameAsTable(df3, "table2")
- >>> df4 = sqlCtx.sql(
- ... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
- ... "field6 as f4 from table2")
- >>> for r in df4.collect():
- ... print r
- Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None)
- Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
- Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
+ >>> df1.first()
+ Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
+
+ >>> df2 = sqlCtx.jsonRDD(json, df1.schema)
+ >>> df2.first()
+ Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> from pyspark.sql.types import *
>>> schema = StructType([
- ... StructField("field2", StringType(), True),
+ ... StructField("field2", StringType()),
... StructField("field3",
- ... StructType([
- ... StructField("field5",
- ... ArrayType(IntegerType(), False), True)]), False)])
- >>> df5 = sqlCtx.jsonRDD(json, schema)
- >>> sqlCtx.registerDataFrameAsTable(df5, "table3")
- >>> df6 = sqlCtx.sql(
- ... "SELECT field2 AS f1, field3.field5 as f2, "
- ... "field3.field5[0] as f3 from table3")
- >>> df6.collect()
- [Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)]
-
- >>> sqlCtx.jsonRDD(sc.parallelize(['{}',
- ... '{"key0": {"key1": "value1"}}'])).collect()
- [Row(key0=None), Row(key0=Row(key1=u'value1'))]
- >>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}',
- ... '{"key0": {"key1": "value1"}}'])).collect()
- [Row(key0=None), Row(key0=Row(key1=u'value1'))]
+ ... StructType([StructField("field5", ArrayType(IntegerType()))]))
+ ... ])
+ >>> df3 = sqlCtx.jsonRDD(json, schema)
+ >>> df3.first()
+ Row(field2=u'row1', field3=Row(field5=None))
+
"""
def func(iterator):
@@ -848,7 +733,8 @@ def _test():
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.context, globs=globs, optionflags=doctest.ELLIPSIS)
+ pyspark.sql.context, globs=globs,
+ optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
globs['sc'].stop()
if failure_count:
exit(-1)