diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/serializers.py | 6 | ||||
-rw-r--r-- | python/pyspark/worker.py | 13 |
2 files changed, 13 insertions, 6 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 54fed1c9c7..fbc280fd37 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -19,6 +19,12 @@ import struct import cPickle +class SpecialLengths(object): + END_OF_DATA_SECTION = -1 + PYTHON_EXCEPTION_THROWN = -2 + TIMING_DATA = -3 + + class Batch(object): """ Used to store multiple RDD entries as a single Java object. diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d63c2aaef7..7696df9d1c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -31,7 +31,8 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler from pyspark.files import SparkFiles from pyspark.serializers import write_with_length, read_with_length, write_int, \ - read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file + read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file \ + SpecialLengths def load_obj(infile): @@ -39,7 +40,7 @@ def load_obj(infile): def report_times(outfile, boot, init, finish): - write_int(-3, outfile) + write_int(SpecialLengths.TIMING_DATA, outfile) write_long(1000 * boot, outfile) write_long(1000 * init, outfile) write_long(1000 * finish, outfile) @@ -82,16 +83,16 @@ def main(infile, outfile): for obj in func(split_index, iterator): write_with_length(dumps(obj), outfile) except Exception as e: - write_int(-2, outfile) + write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) write_with_length(traceback.format_exc(), outfile) sys.exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) # Mark the beginning of the accumulators section of the output - write_int(-1, outfile) - for aid, accum in _accumulatorRegistry.items(): + write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) + write_int(len(_accumulatorRegistry), outfile) + for (aid, accum) in _accumulatorRegistry.items(): write_with_length(dump_pickle((aid, accum._value)), outfile) - write_int(-1, outfile) if __name__ == '__main__': |