diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
commit | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch) | |
tree | 3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /python/pyspark/broadcast.py | |
parent | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff) | |
parent | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff) | |
download | spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2 spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip |
Merge branch 'streaming' into ScrapCode-streaming
Conflicts:
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'python/pyspark/broadcast.py')
-rw-r--r-- | python/pyspark/broadcast.py | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py new file mode 100644 index 0000000000..def810dd46 --- /dev/null +++ b/python/pyspark/broadcast.py @@ -0,0 +1,39 @@ +""" +>>> from pyspark.context import SparkContext +>>> sc = SparkContext('local', 'test') +>>> b = sc.broadcast([1, 2, 3, 4, 5]) +>>> b.value +[1, 2, 3, 4, 5] + +>>> from pyspark.broadcast import _broadcastRegistry +>>> _broadcastRegistry[b.bid] = b +>>> from cPickle import dumps, loads +>>> loads(dumps(b)).value +[1, 2, 3, 4, 5] + +>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() +[1, 2, 3, 4, 5, 1, 2, 3, 4, 5] + +>>> large_broadcast = sc.broadcast(list(range(10000))) +""" +# Holds broadcasted data received from Java, keyed by its id. +_broadcastRegistry = {} + + +def _from_id(bid): + from pyspark.broadcast import _broadcastRegistry + if bid not in _broadcastRegistry: + raise Exception("Broadcast variable '%s' not loaded!" % bid) + return _broadcastRegistry[bid] + + +class Broadcast(object): + def __init__(self, bid, value, java_broadcast=None, pickle_registry=None): + self.value = value + self.bid = bid + self._jbroadcast = java_broadcast + self._pickle_registry = pickle_registry + + def __reduce__(self): + self._pickle_registry.add(self) + return (_from_id, (self.bid, )) |