aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-11-10 12:58:28 -0800
committerJosh Rosen <joshrosen@apache.org>2013-11-10 16:46:00 -0800
commitffa5bedf46fbc89ad5c5658f3b423dfff49b70f0 (patch)
tree972ab8bb7b02ee9903a524c28f24c9399c30d4fd /python/pyspark/worker.py
parentcbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (diff)
downloadspark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.gz
spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.tar.bz2
spark-ffa5bedf46fbc89ad5c5658f3b423dfff49b70f0.zip
Send PySpark commands as bytes insetad of strings.
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py12
1 files changed, 2 insertions, 10 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 5b16d5db7e..2751f1239e 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,7 +23,6 @@ import sys
import time
import socket
import traceback
-from base64 import standard_b64decode
# CloudPickler needs to be imported so that depicklers are registered using the
# copy_reg module.
from pyspark.accumulators import _accumulatorRegistry
@@ -38,11 +37,6 @@ pickleSer = PickleSerializer()
mutf8_deserializer = MUTF8Deserializer()
-def load_obj(infile):
- decoded = standard_b64decode(infile.readline().strip())
- return pickleSer._loads(decoded)
-
-
def report_times(outfile, boot, init, finish):
write_int(SpecialLengths.TIMING_DATA, outfile)
write_long(1000 * boot, outfile)
@@ -75,10 +69,8 @@ def main(infile, outfile):
filename = mutf8_deserializer._loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))
- # Load this stage's function and serializer:
- func = load_obj(infile)
- deserializer = load_obj(infile)
- serializer = load_obj(infile)
+ command = pickleSer._read_with_length(infile)
+ (func, deserializer, serializer) = command
init_time = time.time()
try:
iterator = deserializer.load_stream(infile)