diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
commit | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch) | |
tree | 3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /python/pyspark/worker.py | |
parent | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff) | |
parent | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff) | |
download | spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2 spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip |
Merge branch 'streaming' into ScrapCode-streaming
Conflicts:
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py new file mode 100644 index 0000000000..812e7a9da5 --- /dev/null +++ b/python/pyspark/worker.py @@ -0,0 +1,59 @@ +""" +Worker that receives input from Piped RDD. +""" +import os +import sys +import traceback +from base64 import standard_b64decode +# CloudPickler needs to be imported so that depicklers are registered using the +# copy_reg module. +from pyspark.accumulators import _accumulatorRegistry +from pyspark.broadcast import Broadcast, _broadcastRegistry +from pyspark.cloudpickle import CloudPickler +from pyspark.files import SparkFiles +from pyspark.serializers import write_with_length, read_with_length, write_int, \ + read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file + + +# Redirect stdout to stderr so that users must return values from functions. +old_stdout = os.fdopen(os.dup(1), 'w') +os.dup2(2, 1) + + +def load_obj(): + return load_pickle(standard_b64decode(sys.stdin.readline().strip())) + + +def main(): + split_index = read_int(sys.stdin) + spark_files_dir = load_pickle(read_with_length(sys.stdin)) + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True + sys.path.append(spark_files_dir) + num_broadcast_variables = read_int(sys.stdin) + for _ in range(num_broadcast_variables): + bid = read_long(sys.stdin) + value = read_with_length(sys.stdin) + _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value)) + func = load_obj() + bypassSerializer = load_obj() + if bypassSerializer: + dumps = lambda x: x + else: + dumps = dump_pickle + iterator = read_from_pickle_file(sys.stdin) + try: + for obj in func(split_index, iterator): + write_with_length(dumps(obj), old_stdout) + except Exception as e: + write_int(-2, old_stdout) + write_with_length(traceback.format_exc(), old_stdout) + sys.exit(-1) + # Mark the beginning of the accumulators section of the output + write_int(-1, old_stdout) + for aid, accum in _accumulatorRegistry.items(): + write_with_length(dump_pickle((aid, accum._value)), old_stdout) + + +if __name__ == '__main__': + main() |