aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2014-11-03 23:56:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-11-03 23:56:14 -0800
commite4f42631a68b473ce706429915f3f08042af2119 (patch)
tree557ff754b9936addfb9628bfcba462802ff6ec1c /python/pyspark/sql.py
parentb671ce047d036b8923007902826038b01e836e8a (diff)
downloadspark-e4f42631a68b473ce706429915f3f08042af2119.tar.gz
spark-e4f42631a68b473ce706429915f3f08042af2119.tar.bz2
spark-e4f42631a68b473ce706429915f3f08042af2119.zip
[SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1. Author: Davies Liu <davies@databricks.com> This patch had conflicts when merged, resolved by Committer: Josh Rosen <joshrosen@databricks.com> Closes #2920 from davies/fix_autobatch and squashes the following commits: e544ef9 [Davies Liu] revert unrelated change 6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 1d557fc [Davies Liu] fix tests 8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 76abdce [Davies Liu] clean up 53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch 2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch b4292ce [Davies Liu] fix bug in master d79744c [Davies Liu] recover hive tests be37ece [Davies Liu] refactor eb3938d [Davies Liu] refactor serializer in scala 8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
Diffstat (limited to 'python/pyspark/sql.py')
-rw-r--r--python/pyspark/sql.py18
1 files changed, 7 insertions, 11 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index d16c18bc79..e5d62a466c 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -44,7 +44,8 @@ from py4j.protocol import Py4JError
from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
+ CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1233,7 +1234,6 @@ class SQLContext(object):
self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
- self._pythonToJava = self._jvm.PythonRDD.pythonToJavaArray
self._scala_SQLContext = sqlContext
@property
@@ -1263,8 +1263,8 @@ class SQLContext(object):
"""
func = lambda _, it: imap(lambda x: f(*x), it)
command = (func, None,
- BatchedSerializer(PickleSerializer(), 1024),
- BatchedSerializer(PickleSerializer(), 1024))
+ AutoBatchedSerializer(PickleSerializer()),
+ AutoBatchedSerializer(PickleSerializer()))
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
@@ -1443,8 +1443,7 @@ class SQLContext(object):
converter = _python_to_sql_converter(schema)
rdd = rdd.map(converter)
- batched = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- jrdd = self._pythonToJava(rdd._jrdd, batched)
+ jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return SchemaRDD(srdd.toJavaSchemaRDD(), self)
@@ -1841,7 +1840,7 @@ class SchemaRDD(RDD):
self.is_checkpointed = False
self.ctx = self.sql_ctx._sc
# the _jrdd is created by javaToPython(), serialized by pickle
- self._jrdd_deserializer = BatchedSerializer(PickleSerializer())
+ self._jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
@property
def _jrdd(self):
@@ -2071,16 +2070,13 @@ class SchemaRDD(RDD):
def _test():
import doctest
- from array import array
from pyspark.context import SparkContext
# let doctest run in pyspark.sql, so DataTypes can be picklable
import pyspark.sql
from pyspark.sql import Row, SQLContext
from pyspark.tests import ExamplePoint, ExamplePointUDT
globs = pyspark.sql.__dict__.copy()
- # The small batch size here ensures that we see multiple batches,
- # even in these small test examples:
- sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize(