diff options
author | Josh Rosen <rosenville@gmail.com> | 2013-01-20 11:00:36 -0800 |
---|---|---|
committer | Josh Rosen <rosenville@gmail.com> | 2013-01-20 11:00:36 -0800 |
commit | 9f54d7e1f5a5e6f80b3d710de67f800bef943d33 (patch) | |
tree | afddd75af4070c9b223688a2f1088b359d7e66f0 /python/pyspark/worker.py | |
parent | 2a8c2a67909c4878ea24ec94f203287e55dd3782 (diff) | |
parent | ee5a07955c222dce16d0ffb9bde7f61033763c16 (diff) | |
download | spark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.tar.gz spark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.tar.bz2 spark-9f54d7e1f5a5e6f80b3d710de67f800bef943d33.zip |
Merge pull request #387 from mateiz/python-accumulators
Add accumulators to PySpark
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 3d792bbaa2..b2b9288089 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -5,9 +5,10 @@ import sys from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. +from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler -from pyspark.serializers import write_with_length, read_with_length, \ +from pyspark.serializers import write_with_length, read_with_length, write_int, \ read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file @@ -36,6 +37,10 @@ def main(): iterator = read_from_pickle_file(sys.stdin) for obj in func(split_index, iterator): write_with_length(dumps(obj), old_stdout) + # Mark the beginning of the accumulators section of the output + write_int(-1, old_stdout) + for aid, accum in _accumulatorRegistry.items(): + write_with_length(dump_pickle((aid, accum._value)), old_stdout) if __name__ == '__main__': |