diff options
author | Tejas Patil <tejasp@fb.com> | 2016-03-16 09:58:53 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-16 09:58:53 +0000 |
commit | 1d95fb6785dd77d879d3d60e15320f72ab185fd3 (patch) | |
tree | 8408b60872b6266b93fe0ce02945154e80a25426 /core/src/test | |
parent | 56d88247f14ca54750816748f5b6b2aca7bc6fea (diff) | |
download | spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.gz spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.bz2 spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.zip |
[SPARK-13793][CORE] PipedRDD doesn't propagate exceptions while reading parent RDD
## What changes were proposed in this pull request?
PipedRDD creates a child thread to read output of the parent stage and feed it to the pipe process. Used a variable to save the exception thrown in the child thread and then propagating the exception in the main thread if the variable was set.
## How was this patch tested?
- Added a unit test
- Ran all the existing tests in PipedRDDSuite and they all pass with the change
- Tested the patch with a real pipe() job, bounced the executor node which ran the parent stage to simulate a fetch failure and observed that the parent stage was re-ran.
Author: Tejas Patil <tejasp@fb.com>
Closes #11628 from tejasapatil/pipe_rdd.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 1eebc924a5..d13da38ed0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -50,6 +50,27 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("failure in iterating over pipe input") { + if (testCommandAvailable("cat")) { + val nums = + sc.makeRDD(Array(1, 2, 3, 4), 2) + .mapPartitionsWithIndex((index, iterator) => { + new Iterator[Int] { + def hasNext = true + def next() = { + throw new SparkException("Exception to simulate bad scenario") + } + } + }) + + val piped = nums.pipe(Seq("cat")) + + intercept[SparkException] { + piped.collect() + } + } + } + test("advanced pipe") { if (testCommandAvailable("cat")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) |