diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-18 18:11:48 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-18 18:11:48 -0700 |
commit | e77fa81a61798c89d5a9b6c9dc067d11785254b7 (patch) | |
tree | 2d84f29922e4523f223baff1c84573754c1cf0c7 /python/pyspark/worker.py | |
parent | 9306297d1d888d0430f79b2133ee7377871a3a18 (diff) | |
download | spark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.tar.gz spark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.tar.bz2 spark-e77fa81a61798c89d5a9b6c9dc067d11785254b7.zip |
[SPARK-3554] [PySpark] use broadcast automatically for large closure
Py4j can not handle large string efficiently, so we should use broadcast for large closure automatically. (Broadcast use local filesystem to pass through data).
Author: Davies Liu <davies.liu@gmail.com>
Closes #2417 from davies/command and squashes the following commits:
fbf4e97 [Davies Liu] bugfix
aefd508 [Davies Liu] use broadcast automatically for large closure
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 252176ac65..d6c06e2dbe 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -77,10 +77,12 @@ def main(infile, outfile): _broadcastRegistry[bid] = Broadcast(bid, value) else: bid = - bid - 1 - _broadcastRegistry.remove(bid) + _broadcastRegistry.pop(bid) _accumulatorRegistry.clear() command = pickleSer._read_with_length(infile) + if isinstance(command, Broadcast): + command = pickleSer.loads(command.value) (func, deserializer, serializer) = command init_time = time.time() iterator = deserializer.load_stream(infile) |