aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorJosh Rosen <rosenville@gmail.com>2013-01-20 11:00:36 -0800
committerJosh Rosen <rosenville@gmail.com>2013-01-20 11:00:36 -0800
commit9f54d7e1f5a5e6f80b3d710de67f800bef943d33 (patch)
treeafddd75af4070c9b223688a2f1088b359d7e66f0 /python/pyspark/worker.py
parent2a8c2a67909c4878ea24ec94f203287e55dd3782 (diff)
parentee5a07955c222dce16d0ffb9bde7f61033763c16 (diff)
downloadspark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.tar.gz
spark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.tar.bz2
spark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.zip
Merge pull request #387 from mateiz/python-accumulators
Add accumulators to PySpark
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py7
1 files changed, 6 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3d792bbaa2..b2b9288089 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -5,9 +5,10 @@ import sys
from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
+from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
-from pyspark.serializers import write_with_length, read_with_length, \
+from pyspark.serializers import write_with_length, read_with_length, write_int, \
read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
@@ -36,6 +37,10 @@ def main():
iterator = read_from_pickle_file(sys.stdin)
for obj in func(split_index, iterator):
write_with_length(dumps(obj), old_stdout)
+ # Mark the beginning of the accumulators section of the output
+ write_int(-1, old_stdout)
+ for aid, accum in _accumulatorRegistry.items():
+ write_with_length(dump_pickle((aid, accum._value)), old_stdout)
if __name__ == '__main__':