diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-08-16 16:59:34 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-08-16 16:59:34 -0700 |
commit | 2fc8aca086a2679b854038b7e2c488f19039ecbd (patch) | |
tree | 72d04bfa5c065c84b62b095e8d247402df384289 /python/pyspark/broadcast.py | |
parent | 379e7585c356f20bf8b4878ecba9401e2195da12 (diff) | |
download | spark-2fc8aca086a2679b854038b7e2c488f19039ecbd.tar.gz spark-2fc8aca086a2679b854038b7e2c488f19039ecbd.tar.bz2 spark-2fc8aca086a2679b854038b7e2c488f19039ecbd.zip |
[SPARK-1065] [PySpark] improve supporting for large broadcast
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
Add an option to keep object in driver (it's False by default) to save memory in driver.
Author: Davies Liu <davies.liu@gmail.com>
Closes #1912 from davies/broadcast and squashes the following commits:
e06df4a [Davies Liu] load broadcast from disk in driver automatically
db3f232 [Davies Liu] fix serialization of accumulator
631a827 [Davies Liu] Merge branch 'master' into broadcast
c7baa8c [Davies Liu] compress serrialized broadcast and command
9a7161f [Davies Liu] fix doc tests
e93cf4b [Davies Liu] address comments: add test
6226189 [Davies Liu] improve large broadcast
Diffstat (limited to 'python/pyspark/broadcast.py')
-rw-r--r-- | python/pyspark/broadcast.py | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index f3e64989ed..675a2fcd2f 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -21,18 +21,16 @@ >>> b = sc.broadcast([1, 2, 3, 4, 5]) >>> b.value [1, 2, 3, 4, 5] - ->>> from pyspark.broadcast import _broadcastRegistry ->>> _broadcastRegistry[b.bid] = b ->>> from cPickle import dumps, loads ->>> loads(dumps(b)).value -[1, 2, 3, 4, 5] - >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] +>>> b.unpersist() >>> large_broadcast = sc.broadcast(list(range(10000))) """ +import os + +from pyspark.serializers import CompressedSerializer, PickleSerializer + # Holds broadcasted data received from Java, keyed by its id. _broadcastRegistry = {} @@ -52,17 +50,38 @@ class Broadcast(object): Access its value through C{.value}. """ - def __init__(self, bid, value, java_broadcast=None, pickle_registry=None): + def __init__(self, bid, value, java_broadcast=None, + pickle_registry=None, path=None): """ Should not be called directly by users -- use L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>} instead. """ - self.value = value self.bid = bid + if path is None: + self.value = value self._jbroadcast = java_broadcast self._pickle_registry = pickle_registry + self.path = path + + def unpersist(self, blocking=False): + self._jbroadcast.unpersist(blocking) + os.unlink(self.path) def __reduce__(self): self._pickle_registry.add(self) return (_from_id, (self.bid, )) + + def __getattr__(self, item): + if item == 'value' and self.path is not None: + ser = CompressedSerializer(PickleSerializer()) + value = ser.load_stream(open(self.path)).next() + self.value = value + return value + + raise AttributeError(item) + + +if __name__ == "__main__": + import doctest + doctest.testmod() |