diff options
author | Davies Liu <davies@databricks.com> | 2015-07-09 14:43:38 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-07-09 14:43:38 -0700 |
commit | c9e2ef52bb54f35a904427389dc492d61f29b018 (patch) | |
tree | 90887ae7055aa78751561119083bd09ac099e0f4 /python/pyspark/sql/dataframe.py | |
parent | 3ccebf36c5abe04702d4cf223552a94034d980fb (diff) | |
download | spark-c9e2ef52bb54f35a904427389dc492d61f29b018.tar.gz spark-c9e2ef52bb54f35a904427389dc492d61f29b018.tar.bz2 spark-c9e2ef52bb54f35a904427389dc492d61f29b018.zip |
[SPARK-7902] [SPARK-6289] [SPARK-8685] [SQL] [PYSPARK] Refactor of serialization for Python DataFrame
This PR fix the long standing issue of serialization between Python RDD and DataFrame, it change to using a customized Pickler for InternalRow to enable customized unpickling (type conversion, especially for UDT), now we can support UDT for UDF, cc mengxr .
There is no generated `Row` anymore.
Author: Davies Liu <davies@databricks.com>
Closes #7301 from davies/sql_ser and squashes the following commits:
81bef71 [Davies Liu] address comments
e9217bd [Davies Liu] add regression tests
db34167 [Davies Liu] Refactor of serialization for Python DataFrame
Diffstat (limited to 'python/pyspark/sql/dataframe.py')
-rw-r--r-- | python/pyspark/sql/dataframe.py | 16 |
1 files changed, 3 insertions, 13 deletions
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 1e9c657cf8..83e02b85f0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -31,7 +31,7 @@ from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deseria from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql import since -from pyspark.sql.types import _create_cls, _parse_datatype_json_string +from pyspark.sql.types import _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_java_column from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.types import * @@ -83,15 +83,7 @@ class DataFrame(object): """ if self._lazy_rdd is None: jrdd = self._jdf.javaToPython() - rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) - schema = self.schema - - def applySchema(it): - cls = _create_cls(schema) - return map(cls, it) - - self._lazy_rdd = rdd.mapPartitions(applySchema) - + self._lazy_rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) return self._lazy_rdd @property @@ -287,9 +279,7 @@ class DataFrame(object): """ with SCCallSiteSync(self._sc) as css: port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) - rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) - cls = _create_cls(self.schema) - return [cls(r) for r in rs] + return list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) @ignore_unicode_prefix @since(1.3) |