diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-20 01:57:44 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-20 01:57:44 -0800 |
commit | 8e7f098a2c9e5e85cb9435f28d53a3a5847c14aa (patch) | |
tree | a4b5d1891501e99f44b4d797bee3a10504e0b2fd /python/pyspark/worker.py | |
parent | 54c0f9f185576e9b844fa8f81ca410f188daa51c (diff) | |
download | spark-8e7f098a2c9e5e85cb9435f28d53a3a5847c14aa.tar.gz spark-8e7f098a2c9e5e85cb9435f28d53a3a5847c14aa.tar.bz2 spark-8e7f098a2c9e5e85cb9435f28d53a3a5847c14aa.zip |
Added accumulators to PySpark
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 3d792bbaa2..b2b9288089 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -5,9 +5,10 @@ import sys from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. +from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler -from pyspark.serializers import write_with_length, read_with_length, \ +from pyspark.serializers import write_with_length, read_with_length, write_int, \ read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file @@ -36,6 +37,10 @@ def main(): iterator = read_from_pickle_file(sys.stdin) for obj in func(split_index, iterator): write_with_length(dumps(obj), old_stdout) + # Mark the beginning of the accumulators section of the output + write_int(-1, old_stdout) + for aid, accum in _accumulatorRegistry.items(): + write_with_length(dump_pickle((aid, accum._value)), old_stdout) if __name__ == '__main__': |