aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-08 16:04:41 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-08 16:05:02 -0800
commitb57dd0f16024a82dfc223e69528b9908b931f068 (patch)
tree8ad1222593da58eaeb7746aecaef2c41c5313f71 /python/pyspark/worker.py
parent33beba39656fc64984db09a82fc69ca4edcc02d4 (diff)
downloadspark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.gz
spark-b57dd0f16024a82dfc223e69528b9908b931f068.tar.bz2
spark-b57dd0f16024a82dfc223e69528b9908b931f068.zip
Add mapPartitionsWithSplit() to PySpark.
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 9f6b507dbd..3d792bbaa2 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -21,6 +21,7 @@ def load_obj():
def main():
+ split_index = read_int(sys.stdin)
num_broadcast_variables = read_int(sys.stdin)
for _ in range(num_broadcast_variables):
bid = read_long(sys.stdin)
@@ -32,7 +33,8 @@ def main():
dumps = lambda x: x
else:
dumps = dump_pickle
- for obj in func(read_from_pickle_file(sys.stdin)):
+ iterator = read_from_pickle_file(sys.stdin)
+ for obj in func(split_index, iterator):
write_with_length(dumps(obj), old_stdout)