aboutsummaryrefslogtreecommitdiff
path: root/pyspark/pyspark/worker.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-24 17:20:10 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-24 17:20:10 -0800
commit4608902fb87af64a15b97ab21fe6382cd6e5a644 (patch)
treeef2e9e7a9d3f88dccd70e7b6e39753354b605207 /pyspark/pyspark/worker.py
parentccd075cf960df6c6c449b709515cdd81499a52be (diff)
downloadspark-4608902fb87af64a15b97ab21fe6382cd6e5a644.tar.gz
spark-4608902fb87af64a15b97ab21fe6382cd6e5a644.tar.bz2
spark-4608902fb87af64a15b97ab21fe6382cd6e5a644.zip
Use filesystem to collect RDDs in PySpark.
Passing large volumes of data through Py4J seems to be slow. It appears to be faster to write the data to the local filesystem and read it back from Python.
Diffstat (limited to 'pyspark/pyspark/worker.py')
-rw-r--r--pyspark/pyspark/worker.py12
1 files changed, 2 insertions, 10 deletions
diff --git a/pyspark/pyspark/worker.py b/pyspark/pyspark/worker.py
index 62824a1c9b..9f6b507dbd 100644
--- a/pyspark/pyspark/worker.py
+++ b/pyspark/pyspark/worker.py
@@ -8,7 +8,7 @@ from base64 import standard_b64decode
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler
from pyspark.serializers import write_with_length, read_with_length, \
- read_long, read_int, dump_pickle, load_pickle
+ read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
# Redirect stdout to stderr so that users must return values from functions.
@@ -20,14 +20,6 @@ def load_obj():
return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
-def read_input():
- try:
- while True:
- yield load_pickle(read_with_length(sys.stdin))
- except EOFError:
- return
-
-
def main():
num_broadcast_variables = read_int(sys.stdin)
for _ in range(num_broadcast_variables):
@@ -40,7 +32,7 @@ def main():
dumps = lambda x: x
else:
dumps = dump_pickle
- for obj in func(read_input()):
+ for obj in func(read_from_pickle_file(sys.stdin)):
write_with_length(dumps(obj), old_stdout)