aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-18 18:11:48 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-18 18:11:48 -0700
commite77fa81a61798c89d5a9b6c9dc067d11785254b7 (patch)
tree2d84f29922e4523f223baff1c84573754c1cf0c7 /python/pyspark/rdd.py
parent9306297d1d888d0430f79b2133ee7377871a3a18 (diff)
downloadspark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.tar.gz
spark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.tar.bz2
spark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.zip
[SPARK-3554] [PySpark] use broadcast automatically for large closure
Py4j can not handle large string efficiently, so we should use broadcast for large closure automatically. (Broadcast use local filesystem to pass through data). Author: Davies Liu <davies.liu@gmail.com> Closes #2417 from davies/command and squashes the following commits: fbf4e97 [Davies Liu] bugfix aefd508 [Davies Liu] use broadcast automatically for large closure
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py4
1 files changed, 4 insertions, 0 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index cb09c191be..b43606b730 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2061,8 +2061,12 @@ class PipelinedRDD(RDD):
self._jrdd_deserializer = NoOpSerializer()
command = (self.func, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
+ # the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
pickled_command = ser.dumps(command)
+ if pickled_command > (1 << 20): # 1M
+ broadcast = self.ctx.broadcast(pickled_command)
+ pickled_command = ser.dumps(broadcast)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)