diff options
author | Josh Rosen <joshrosen@apache.org> | 2013-11-10 12:58:28 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2013-11-10 16:46:00 -0800 |
commit | ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0 (patch) | |
tree | 972ab8bb7b02ee9903a524c28f24c9399c30d4fd /python/pyspark/worker.py | |
parent | cbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (diff) | |
download | spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.gz spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.bz2 spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.zip |
Send PySpark commands as bytes insetad of strings.
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 12 |
1 files changed, 2 insertions, 10 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5b16d5db7e..2751f1239e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,7 +23,6 @@ import sys import time import socket import traceback -from base64 import standard_b64decode # CloudPickler needs to be imported so that depicklers are registered using the # copy_reg module. from pyspark.accumulators import _accumulatorRegistry @@ -38,11 +37,6 @@ pickleSer = PickleSerializer() mutf8_deserializer = MUTF8Deserializer() -def load_obj(infile): - decoded = standard_b64decode(infile.readline().strip()) - return pickleSer._loads(decoded) - - def report_times(outfile, boot, init, finish): write_int(SpecialLengths.TIMING_DATA, outfile) write_long(1000 * boot, outfile) @@ -75,10 +69,8 @@ def main(infile, outfile): filename = mutf8_deserializer._loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) - # Load this stage's function and serializer: - func = load_obj(infile) - deserializer = load_obj(infile) - serializer = load_obj(infile) + command = pickleSer._read_with_length(infile) + (func, deserializer, serializer) = command init_time = time.time() try: iterator = deserializer.load_stream(infile) |