aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-02-20 15:35:05 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-20 15:35:05 -0800
commit5b0a42cb17b840c82d3f8a5ad061d99e261ceadf (patch)
treedbdc285db33b30e2797400373b43568673d4741c /python/pyspark/sql/context.py
parent4a17eedb16343413e5b6f8bb58c6da8952ee7ab6 (diff)
downloadspark-5b0a42cb17b840c82d3f8a5ad061d99e261ceadf.tar.gz
spark-5b0a42cb17b840c82d3f8a5ad061d99e261ceadf.tar.bz2
spark-5b0a42cb17b840c82d3f8a5ad061d99e261ceadf.zip
[SPARK-5898] [SPARK-5896] [SQL] [PySpark] create DataFrame from pandas and tuple/list
Fix createDataFrame() from pandas DataFrame (not tested by jenkins, depends on SPARK-5693). It also support to create DataFrame from plain tuple/list without column names, `_1`, `_2` will be used as column names. Author: Davies Liu <davies@databricks.com> Closes #4679 from davies/pandas and squashes the following commits: c0cbe0b [Davies Liu] fix tests 8466d1d [Davies Liu] fix create DataFrame from pandas
Diffstat (limited to 'python/pyspark/sql/context.py')
-rw-r--r--python/pyspark/sql/context.py12
1 files changed, 10 insertions, 2 deletions
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 3f168f718b..313f15e6d9 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -351,6 +351,8 @@ class SQLContext(object):
:return: a DataFrame
>>> l = [('Alice', 1)]
+ >>> sqlCtx.createDataFrame(l).collect()
+ [Row(_1=u'Alice', _2=1)]
>>> sqlCtx.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
@@ -359,6 +361,8 @@ class SQLContext(object):
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
+ >>> sqlCtx.createDataFrame(rdd).collect()
+ [Row(_1=u'Alice', _2=1)]
>>> df = sqlCtx.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
@@ -377,14 +381,17 @@ class SQLContext(object):
>>> df3 = sqlCtx.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
+
+ >>> sqlCtx.createDataFrame(df.toPandas()).collect() # doctest: +SKIP
+ [Row(name=u'Alice', age=1)]
"""
if isinstance(data, DataFrame):
raise TypeError("data is already a DataFrame")
if has_pandas and isinstance(data, pandas.DataFrame):
- data = self._sc.parallelize(data.to_records(index=False))
if schema is None:
schema = list(data.columns)
+ data = [r.tolist() for r in data.to_records(index=False)]
if not isinstance(data, RDD):
try:
@@ -399,7 +406,8 @@ class SQLContext(object):
if isinstance(schema, (list, tuple)):
first = data.first()
if not isinstance(first, (list, tuple)):
- raise ValueError("each row in `rdd` should be list or tuple")
+ raise ValueError("each row in `rdd` should be list or tuple, "
+ "but got %r" % type(first))
row_cls = Row(*schema)
schema = self._inferSchema(data.map(lambda r: row_cls(*r)), samplingRatio)