aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-11-02 21:13:18 -0700
committerJosh Rosen <joshrosen@apache.org>2013-11-03 10:54:24 -0800
commita48d88d206fae348720ab077a624b3c57293374f (patch)
tree1355082a9856dc512a98f4a2da6c82af10f92410 /python
parent41ead7a74533ffdd208a4ba2f7cd38945b4343ec (diff)
downloadspark-a48d88d206fae348720ab077a624b3c57293374f.tar.gz
spark-a48d88d206fae348720ab077a624b3c57293374f.tar.bz2
spark-a48d88d206fae348720ab077a624b3c57293374f.zip
Replace magic lengths with constants in PySpark.
Write the length of the accumulators section up-front rather than terminating it with a negative length. I find this easier to read.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/serializers.py6
-rw-r--r--python/pyspark/worker.py13
2 files changed, 13 insertions, 6 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 54fed1c9c7..fbc280fd37 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -19,6 +19,12 @@ import struct
import cPickle
+class SpecialLengths(object):
+ END_OF_DATA_SECTION = -1
+ PYTHON_EXCEPTION_THROWN = -2
+ TIMING_DATA = -3
+
+
class Batch(object):
"""
Used to store multiple RDD entries as a single Java object.
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index d63c2aaef7..7696df9d1c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -31,7 +31,8 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, read_with_length, write_int, \
- read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
+ read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file \
+ SpecialLengths
def load_obj(infile):
@@ -39,7 +40,7 @@ def load_obj(infile):
def report_times(outfile, boot, init, finish):
- write_int(-3, outfile)
+ write_int(SpecialLengths.TIMING_DATA, outfile)
write_long(1000 * boot, outfile)
write_long(1000 * init, outfile)
write_long(1000 * finish, outfile)
@@ -82,16 +83,16 @@ def main(infile, outfile):
for obj in func(split_index, iterator):
write_with_length(dumps(obj), outfile)
except Exception as e:
- write_int(-2, outfile)
+ write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc(), outfile)
sys.exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output
- write_int(-1, outfile)
- for aid, accum in _accumulatorRegistry.items():
+ write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
+ write_int(len(_accumulatorRegistry), outfile)
+ for (aid, accum) in _accumulatorRegistry.items():
write_with_length(dump_pickle((aid, accum._value)), outfile)
- write_int(-1, outfile)
if __name__ == '__main__':