aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql.py')
-rw-r--r--python/pyspark/sql.py18
1 files changed, 7 insertions, 11 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index d16c18bc79..e5d62a466c 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -44,7 +44,8 @@ from py4j.protocol import Py4JError
from py4j.java_collections import ListConverter, MapConverter
from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
+from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
+ CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -1233,7 +1234,6 @@ class SQLContext(object):
self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
- self._pythonToJava = self._jvm.PythonRDD.pythonToJavaArray
self._scala_SQLContext = sqlContext
@property
@@ -1263,8 +1263,8 @@ class SQLContext(object):
"""
func = lambda _, it: imap(lambda x: f(*x), it)
command = (func, None,
- BatchedSerializer(PickleSerializer(), 1024),
- BatchedSerializer(PickleSerializer(), 1024))
+ AutoBatchedSerializer(PickleSerializer()),
+ AutoBatchedSerializer(PickleSerializer()))
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
if len(pickled_command) > (1 << 20): # 1M
@@ -1443,8 +1443,7 @@ class SQLContext(object):
converter = _python_to_sql_converter(schema)
rdd = rdd.map(converter)
- batched = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
- jrdd = self._pythonToJava(rdd._jrdd, batched)
+ jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return SchemaRDD(srdd.toJavaSchemaRDD(), self)
@@ -1841,7 +1840,7 @@ class SchemaRDD(RDD):
self.is_checkpointed = False
self.ctx = self.sql_ctx._sc
# the _jrdd is created by javaToPython(), serialized by pickle
- self._jrdd_deserializer = BatchedSerializer(PickleSerializer())
+ self._jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
@property
def _jrdd(self):
@@ -2071,16 +2070,13 @@ class SchemaRDD(RDD):
def _test():
import doctest
- from array import array
from pyspark.context import SparkContext
# let doctest run in pyspark.sql, so DataTypes can be picklable
import pyspark.sql
from pyspark.sql import Row, SQLContext
from pyspark.tests import ExamplePoint, ExamplePointUDT
globs = pyspark.sql.__dict__.copy()
- # The small batch size here ensures that we see multiple batches,
- # even in these small test examples:
- sc = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize(