aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/broadcast.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
commit6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch)
tree3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /python/pyspark/broadcast.py
parent56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff)
parent8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff)
downloadspark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip
Merge branch 'streaming' into ScrapCode-streaming
Conflicts: streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'python/pyspark/broadcast.py')
-rw-r--r--python/pyspark/broadcast.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
new file mode 100644
index 0000000000..def810dd46
--- /dev/null
+++ b/python/pyspark/broadcast.py
@@ -0,0 +1,39 @@
+"""
+>>> from pyspark.context import SparkContext
+>>> sc = SparkContext('local', 'test')
+>>> b = sc.broadcast([1, 2, 3, 4, 5])
+>>> b.value
+[1, 2, 3, 4, 5]
+
+>>> from pyspark.broadcast import _broadcastRegistry
+>>> _broadcastRegistry[b.bid] = b
+>>> from cPickle import dumps, loads
+>>> loads(dumps(b)).value
+[1, 2, 3, 4, 5]
+
+>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
+[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
+
+>>> large_broadcast = sc.broadcast(list(range(10000)))
+"""
+# Holds broadcasted data received from Java, keyed by its id.
+_broadcastRegistry = {}
+
+
+def _from_id(bid):
+ from pyspark.broadcast import _broadcastRegistry
+ if bid not in _broadcastRegistry:
+ raise Exception("Broadcast variable '%s' not loaded!" % bid)
+ return _broadcastRegistry[bid]
+
+
+class Broadcast(object):
+ def __init__(self, bid, value, java_broadcast=None, pickle_registry=None):
+ self.value = value
+ self.bid = bid
+ self._jbroadcast = java_broadcast
+ self._pickle_registry = pickle_registry
+
+ def __reduce__(self):
+ self._pickle_registry.add(self)
+ return (_from_id, (self.bid, ))