aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/dataframe.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-09 14:43:38 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-09 14:43:38 -0700
commitc9e2ef52bb54f35a904427389dc492d61f29b018 (patch)
tree90887ae7055aa78751561119083bd09ac099e0f4 /python/pyspark/sql/dataframe.py
parent3ccebf36c5abe04702d4cf223552a94034d980fb (diff)
downloadspark-c9e2ef52bb54f35a904427389dc492d61f29b018.tar.gz
spark-c9e2ef52bb54f35a904427389dc492d61f29b018.tar.bz2
spark-c9e2ef52bb54f35a904427389dc492d61f29b018.zip
[SPARK-7902] [SPARK-6289] [SPARK-8685] [SQL] [PYSPARK] Refactor of serialization for Python DataFrame
This PR fix the long standing issue of serialization between Python RDD and DataFrame, it change to using a customized Pickler for InternalRow to enable customized unpickling (type conversion, especially for UDT), now we can support UDT for UDF, cc mengxr . There is no generated `Row` anymore. Author: Davies Liu <davies@databricks.com> Closes #7301 from davies/sql_ser and squashes the following commits: 81bef71 [Davies Liu] address comments e9217bd [Davies Liu] add regression tests db34167 [Davies Liu] Refactor of serialization for Python DataFrame
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r--python/pyspark/sql/dataframe.py16
1 files changed, 3 insertions, 13 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 1e9c657cf8..83e02b85f0 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -31,7 +31,7 @@ from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deseria
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
from pyspark.sql import since
-from pyspark.sql.types import _create_cls, _parse_datatype_json_string
+from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.column import Column, _to_seq, _to_java_column
from pyspark.sql.readwriter import DataFrameWriter
from pyspark.sql.types import *
@@ -83,15 +83,7 @@ class DataFrame(object):
"""
if self._lazy_rdd is None:
jrdd = self._jdf.javaToPython()
- rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
- schema = self.schema
-
- def applySchema(it):
- cls = _create_cls(schema)
- return map(cls, it)
-
- self._lazy_rdd = rdd.mapPartitions(applySchema)
-
+ self._lazy_rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
return self._lazy_rdd
@property
@@ -287,9 +279,7 @@ class DataFrame(object):
"""
with SCCallSiteSync(self._sc) as css:
port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
- rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
- cls = _create_cls(self.schema)
- return [cls(r) for r in rs]
+ return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
@ignore_unicode_prefix
@since(1.3)