diff options
author | Josh Rosen <joshrosen@apache.org> | 2013-11-02 21:13:18 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2013-11-03 10:54:24 -0800 |
commit | a48d88d206fae348720ab077a624b3c57293374f (patch) | |
tree | 1355082a9856dc512a98f4a2da6c82af10f92410 /python/pyspark/worker.py | |
parent | 41ead7a74533ffdd208a4ba2f7cd38945b4343ec (diff) | |
download | spark-a48d88d206fae348720ab077a624b3c57293374f.tar.gz spark-a48d88d206fae348720ab077a624b3c57293374f.tar.bz2 spark-a48d88d206fae348720ab077a624b3c57293374f.zip |
Replace magic lengths with constants in PySpark.
Write the length of the accumulators section up-front rather
than terminating it with a negative length. I find this
easier to read.
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d63c2aaef7..7696df9d1c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -31,7 +31,8 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler from pyspark.files import SparkFiles from pyspark.serializers import write_with_length, read_with_length, write_int, \ - read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file + read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file \ + SpecialLengths def load_obj(infile): @@ -39,7 +40,7 @@ def load_obj(infile): def report_times(outfile, boot, init, finish): - write_int(-3, outfile) + write_int(SpecialLengths.TIMING_DATA, outfile) write_long(1000 * boot, outfile) write_long(1000 * init, outfile) write_long(1000 * finish, outfile) @@ -82,16 +83,16 @@ def main(infile, outfile): for obj in func(split_index, iterator): write_with_length(dumps(obj), outfile) except Exception as e: - write_int(-2, outfile) + write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) write_with_length(traceback.format_exc(), outfile) sys.exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) # Mark the beginning of the accumulators section of the output - write_int(-1, outfile) - for aid, accum in _accumulatorRegistry.items(): + write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) + write_int(len(_accumulatorRegistry), outfile) + for (aid, accum) in _accumulatorRegistry.items(): write_with_length(dump_pickle((aid, accum._value)), outfile) - write_int(-1, outfile) if __name__ == '__main__': |