diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-24 17:20:10 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-24 17:20:10 -0800 |
commit | 4608902fb87af64a15b97ab21fe6382cd6e5a644 (patch) | |
tree | ef2e9e7a9d3f88dccd70e7b6e39753354b605207 /pyspark/pyspark/worker.py | |
parent | ccd075cf960df6c6c449b709515cdd81499a52be (diff) | |
download | spark-4608902fb87af64a15b97ab21fe6382cd6e5a644.tar.gz spark-4608902fb87af64a15b97ab21fe6382cd6e5a644.tar.bz2 spark-4608902fb87af64a15b97ab21fe6382cd6e5a644.zip |
Use filesystem to collect RDDs in PySpark.
Passing large volumes of data through Py4J seems
to be slow. It appears to be faster to write the
data to the local filesystem and read it back from
Python.
Diffstat (limited to 'pyspark/pyspark/worker.py')
-rw-r--r-- | pyspark/pyspark/worker.py | 12 |
1 files changed, 2 insertions, 10 deletions
diff --git a/pyspark/pyspark/worker.py b/pyspark/pyspark/worker.py index 62824a1c9b..9f6b507dbd 100644 --- a/pyspark/pyspark/worker.py +++ b/pyspark/pyspark/worker.py @@ -8,7 +8,7 @@ from base64 import standard_b64decode from pyspark.broadcast import Broadcast, _broadcastRegistry from pyspark.cloudpickle import CloudPickler from pyspark.serializers import write_with_length, read_with_length, \ - read_long, read_int, dump_pickle, load_pickle + read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file # Redirect stdout to stderr so that users must return values from functions. @@ -20,14 +20,6 @@ def load_obj(): return load_pickle(standard_b64decode(sys.stdin.readline().strip())) -def read_input(): - try: - while True: - yield load_pickle(read_with_length(sys.stdin)) - except EOFError: - return - - def main(): num_broadcast_variables = read_int(sys.stdin) for _ in range(num_broadcast_variables): @@ -40,7 +32,7 @@ def main(): dumps = lambda x: x else: dumps = dump_pickle - for obj in func(read_input()): + for obj in func(read_from_pickle_file(sys.stdin)): write_with_length(dumps(obj), old_stdout) |