aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-08-16 16:59:34 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-16 16:59:34 -0700
commit2fc8aca086a2679b854038b7e2c488f19039ecbd (patch)
tree72d04bfa5c065c84b62b095e8d247402df384289 /python/pyspark/worker.py
parent379e7585c356f20bf8b4878ecba9401e2195da12 (diff)
downloadspark-2fc8aca086a2679b854038b7e2c488f19039ecbd.tar.gz
spark-2fc8aca086a2679b854038b7e2c488f19039ecbd.tar.bz2
spark-2fc8aca086a2679b854038b7e2c488f19039ecbd.zip
[SPARK-1065] [PySpark] improve supporting for large broadcast
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()). Add an option to keep object in driver (it's False by default) to save memory in driver. Author: Davies Liu <davies.liu@gmail.com> Closes #1912 from davies/broadcast and squashes the following commits: e06df4a [Davies Liu] load broadcast from disk in driver automatically db3f232 [Davies Liu] fix serialization of accumulator 631a827 [Davies Liu] Merge branch 'master' into broadcast c7baa8c [Davies Liu] compress serrialized broadcast and command 9a7161f [Davies Liu] fix doc tests e93cf4b [Davies Liu] address comments: add test 6226189 [Davies Liu] improve large broadcast
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 2770f63059..77a9c4a0e0 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -30,7 +30,8 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
- write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer
+ write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
+ CompressedSerializer
pickleSer = PickleSerializer()
@@ -65,12 +66,13 @@ def main(infile, outfile):
# fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
+ ser = CompressedSerializer(pickleSer)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
- value = pickleSer._read_with_length(infile)
+ value = ser._read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, value)
- command = pickleSer._read_with_length(infile)
+ command = ser._read_with_length(infile)
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)