diff options
author | Scott Taylor <github@megatron.me.uk> | 2015-07-10 19:29:32 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-07-10 19:29:32 -0700 |
commit | 6e1c7e2798bb0b7b120e28a07c5d70fa162dd724 (patch) | |
tree | 5c15d36dccb086e55149d354faf9c493fb7c5228 /python/pyspark/rdd.py | |
parent | 33630883685eafcc3ee4521ea8363be342f6e6b4 (diff) | |
download | spark-6e1c7e2798bb0b7b120e28a07c5d70fa162dd724.tar.gz spark-6e1c7e2798bb0b7b120e28a07c5d70fa162dd724.tar.bz2 spark-6e1c7e2798bb0b7b120e28a07c5d70fa162dd724.zip |
[SPARK-7735] [PYSPARK] Raise Exception on non-zero exit from pipe commands
This will allow problems with piped commands to be detected.
This will also allow tasks to be retried where errors are rare (such as network problems in piped commands).
Author: Scott Taylor <github@megatron.me.uk>
Closes #6262 from megatron-me-uk/patch-2 and squashes the following commits:
04ae1d5 [Scott Taylor] Remove spurious empty line
98fa101 [Scott Taylor] fix blank line style error
574b564 [Scott Taylor] Merge pull request #2 from megatron-me-uk/patch-4
0c1e762 [Scott Taylor] Update rdd pipe method for checkCode
ab9a2e1 [Scott Taylor] Update rdd pipe tests for checkCode
eb4801c [Scott Taylor] fix fail_condition
b0ac3a4 [Scott Taylor] Merge pull request #1 from megatron-me-uk/megatron-me-uk-patch-1
a307d13 [Scott Taylor] update rdd tests to test pipe modes
34fcdc3 [Scott Taylor] add optional argument 'mode' for rdd.pipe
a0c0161 [Scott Taylor] fix generator issue
8a9ef9c [Scott Taylor] make check_return_code an iterator
0486ae3 [Scott Taylor] style fixes
8ed89a6 [Scott Taylor] Chain generators to prevent potential deadlock
4153b02 [Scott Taylor] fix list.sort returns None
491d3fc [Scott Taylor] Pass a function handle to assertRaises
3344a21 [Scott Taylor] wrap assertRaises with QuietTest
3ab8c7a [Scott Taylor] remove whitespace for style
cc1a73d [Scott Taylor] fix style issues in pipe test
8db4073 [Scott Taylor] Add a test for rdd pipe functions
1b3dc4e [Scott Taylor] fix missing space around operator style
0974f98 [Scott Taylor] add space between words in multiline string
45f4977 [Scott Taylor] fix line too long style error
5745d85 [Scott Taylor] Remove space to fix style
f552d49 [Scott Taylor] Catch non-zero exit from pipe commands
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 79dafb0a4e..3218bed5c7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -700,12 +700,14 @@ class RDD(object): return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) @ignore_unicode_prefix - def pipe(self, command, env={}): + def pipe(self, command, env={}, checkCode=False): """ Return an RDD created by piping elements to a forked external process. >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() [u'1', u'2', u'', u'3'] + + :param checkCode: whether or not to check the return value of the shell command. """ def func(iterator): pipe = Popen( @@ -717,7 +719,17 @@ class RDD(object): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) + + def check_return_code(): + pipe.wait() + if checkCode and pipe.returncode: + raise Exception("Pipe function `%s' exited " + "with error code %d" % (command, pipe.returncode)) + else: + for i in range(0): + yield i + return (x.rstrip(b'\n').decode('utf-8') for x in + chain(iter(pipe.stdout.readline, b''), check_return_code())) return self.mapPartitions(func) def foreach(self, f): |