aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/broadcast.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 14:48:45 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 15:05:00 -0800
commitb58340dbd9a741331fc4c3829b08c093560056c2 (patch)
tree52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/pyspark/broadcast.py
parent170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff)
downloadspark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz
spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2
spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/pyspark/broadcast.py')
-rw-r--r--python/pyspark/broadcast.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
new file mode 100644
index 0000000000..93876fa738
--- /dev/null
+++ b/python/pyspark/broadcast.py
@@ -0,0 +1,48 @@
+"""
+>>> from pyspark.context import SparkContext
+>>> sc = SparkContext('local', 'test')
+>>> b = sc.broadcast([1, 2, 3, 4, 5])
+>>> b.value
+[1, 2, 3, 4, 5]
+
+>>> from pyspark.broadcast import _broadcastRegistry
+>>> _broadcastRegistry[b.bid] = b
+>>> from cPickle import dumps, loads
+>>> loads(dumps(b)).value
+[1, 2, 3, 4, 5]
+
+>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
+[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
+
+>>> large_broadcast = sc.broadcast(list(range(10000)))
+"""
+# Holds broadcasted data received from Java, keyed by its id.
+_broadcastRegistry = {}
+
+
+def _from_id(bid):
+ from pyspark.broadcast import _broadcastRegistry
+ if bid not in _broadcastRegistry:
+ raise Exception("Broadcast variable '%s' not loaded!" % bid)
+ return _broadcastRegistry[bid]
+
+
+class Broadcast(object):
+ def __init__(self, bid, value, java_broadcast=None, pickle_registry=None):
+ self.value = value
+ self.bid = bid
+ self._jbroadcast = java_broadcast
+ self._pickle_registry = pickle_registry
+
+ def __reduce__(self):
+ self._pickle_registry.add(self)
+ return (_from_id, (self.bid, ))
+
+
+def _test():
+ import doctest
+ doctest.testmod()
+
+
+if __name__ == "__main__":
+ _test()