diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 14:48:45 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-01-01 15:05:00 -0800 |
commit | b58340dbd9a741331fc4c3829b08c093560056c2 (patch) | |
tree | 52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/pyspark/worker.py | |
parent | 170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff) | |
download | spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2 spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip |
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py new file mode 100644 index 0000000000..9f6b507dbd --- /dev/null +++ b/python/pyspark/worker.py @@ -0,0 +1,40 @@ +""" +Worker that receives input from Piped RDD. +""" +import sys +from base64 import standard_b64decode +# CloudPickler needs to be imported so that depicklers are registered using the +# copy_reg module. +from pyspark.broadcast import Broadcast, _broadcastRegistry +from pyspark.cloudpickle import CloudPickler +from pyspark.serializers import write_with_length, read_with_length, \ + read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file + + +# Redirect stdout to stderr so that users must return values from functions. +old_stdout = sys.stdout +sys.stdout = sys.stderr + + +def load_obj(): + return load_pickle(standard_b64decode(sys.stdin.readline().strip())) + + +def main(): + num_broadcast_variables = read_int(sys.stdin) + for _ in range(num_broadcast_variables): + bid = read_long(sys.stdin) + value = read_with_length(sys.stdin) + _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value)) + func = load_obj() + bypassSerializer = load_obj() + if bypassSerializer: + dumps = lambda x: x + else: + dumps = dump_pickle + for obj in func(read_from_pickle_file(sys.stdin)): + write_with_length(dumps(obj), old_stdout) + + +if __name__ == '__main__': + main() |