aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGabe Mulley <gabe@edx.org>2015-01-12 21:44:51 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-12 21:44:51 -0800
commit1e42e96ece9e35ceed9ddebef66d589016878b56 (patch)
treefb3f72de1bbaa475510044333e15f18e54ff3639 /python
parent5d9fa550820543ee1b0ce82997917745973a5d65 (diff)
downloadspark-1e42e96ece9e35ceed9ddebef66d589016878b56.tar.gz
spark-1e42e96ece9e35ceed9ddebef66d589016878b56.tar.bz2
spark-1e42e96ece9e35ceed9ddebef66d589016878b56.zip
[SPARK-5138][SQL] Ensure schema can be inferred from a namedtuple
When attempting to infer the schema of an RDD that contains namedtuples, pyspark fails to identify the records as namedtuples, resulting in it raising an error. Example: ```python from pyspark import SparkContext from pyspark.sql import SQLContext from collections import namedtuple import os sc = SparkContext() rdd = sc.textFile(os.path.join(os.getenv('SPARK_HOME'), 'README.md')) TextLine = namedtuple('TextLine', 'line length') tuple_rdd = rdd.map(lambda l: TextLine(line=l, length=len(l))) tuple_rdd.take(5) # This works sqlc = SQLContext(sc) # The following line raises an error schema_rdd = sqlc.inferSchema(tuple_rdd) ``` The error raised is: ``` File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 107, in main process() File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.py", line 1107, in takeUpToNumLeft yield next(iterator) File "/opt/spark-1.2.0-bin-hadoop2.4/python/pyspark/sql.py", line 816, in convert_struct raise ValueError("unexpected tuple: %s" % obj) TypeError: not all arguments converted during string formatting ``` Author: Gabe Mulley <gabe@edx.org> Closes #3978 from mulby/inferschema-namedtuple and squashes the following commits: 98c61cc [Gabe Mulley] Ensure exception message is populated correctly 375d96b [Gabe Mulley] Ensure schema can be inferred from a namedtuple
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql.py18
1 files changed, 14 insertions, 4 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 0e8b398fc6..014ac1791c 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -807,14 +807,14 @@ def _create_converter(dataType):
return
if isinstance(obj, tuple):
- if hasattr(obj, "fields"):
- d = dict(zip(obj.fields, obj))
- if hasattr(obj, "__FIELDS__"):
+ if hasattr(obj, "_fields"):
+ d = dict(zip(obj._fields, obj))
+ elif hasattr(obj, "__FIELDS__"):
d = dict(zip(obj.__FIELDS__, obj))
elif all(isinstance(x, tuple) and len(x) == 2 for x in obj):
d = dict(obj)
else:
- raise ValueError("unexpected tuple: %s" % obj)
+ raise ValueError("unexpected tuple: %s" % str(obj))
elif isinstance(obj, dict):
d = obj
@@ -1327,6 +1327,16 @@ class SQLContext(object):
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
>>> srdd.collect()
[Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])]
+
+ >>> from collections import namedtuple
+ >>> CustomRow = namedtuple('CustomRow', 'field1 field2')
+ >>> rdd = sc.parallelize(
+ ... [CustomRow(field1=1, field2="row1"),
+ ... CustomRow(field1=2, field2="row2"),
+ ... CustomRow(field1=3, field2="row3")])
+ >>> srdd = sqlCtx.inferSchema(rdd)
+ >>> srdd.collect()[0]
+ Row(field1=1, field2=u'row1')
"""
if isinstance(rdd, SchemaRDD):