aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-04-29 18:06:45 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-29 18:06:45 -0700
commitd33df1c151f8e982edd7324edc06d8cd3024dd34 (patch)
tree9009241a55a2a85b051e0f74fa42919d7ffb85f7 /python/pyspark/rdd.py
parentbf8d0aa27844b1e58f131d49a3f668d1614ca6e7 (diff)
downloadspark-d33df1c151f8e982edd7324edc06d8cd3024dd34.tar.gz
spark-d33df1c151f8e982edd7324edc06d8cd3024dd34.tar.bz2
spark-d33df1c151f8e982edd7324edc06d8cd3024dd34.zip
[SPARK-1674] fix interrupted system call error in pyspark's RDD.pipe
`RDD.pipe`'s doctest throws interrupted system call exception on Mac. It can be fixed by wrapping `pipe.stdout.readline` in an iterator. Author: Xiangrui Meng <meng@databricks.com> Closes #594 from mengxr/pyspark-pipe and squashes the following commits: cc32ac9 [Xiangrui Meng] fix interrupted system call error in pyspark's RDD.pipe
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a59778c721..3a1c56af5b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -537,8 +537,8 @@ class RDD(object):
"""
Return an RDD created by piping elements to a forked external process.
- >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
- ['1', '2', '3']
+ >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
+ ['1', '2', '', '3']
"""
def func(iterator):
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
@@ -547,7 +547,7 @@ class RDD(object):
out.write(str(obj).rstrip('\n') + '\n')
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
- return (x.rstrip('\n') for x in pipe.stdout)
+ return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
return self.mapPartitions(func)
def foreach(self, f):