aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql.py')
-rw-r--r--python/pyspark/sql.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 8f6dbab240..42a9920f10 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -27,7 +27,7 @@ import warnings
from array import array
from operator import itemgetter
-from pyspark.rdd import RDD, PipelinedRDD
+from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -975,7 +975,11 @@ class SQLContext(object):
command = (func,
BatchedSerializer(PickleSerializer(), 1024),
BatchedSerializer(PickleSerializer(), 1024))
- pickled_command = CloudPickleSerializer().dumps(command)
+ ser = CloudPickleSerializer()
+ pickled_command = ser.dumps(command)
+ if pickled_command > (1 << 20): # 1M
+ broadcast = self._sc.broadcast(pickled_command)
+ pickled_command = ser.dumps(broadcast)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self._sc._pickled_broadcast_vars],
self._sc._gateway._gateway_client)