aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-03-16 09:58:53 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:58:53 +0000
commit1d95fb6785dd77d879d3d60e15320f72ab185fd3 (patch)
tree8408b60872b6266b93fe0ce02945154e80a25426 /core/src/test
parent56d88247f14ca54750816748f5b6b2aca7bc6fea (diff)
downloadspark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.gz
spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.bz2
spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.zip
[SPARK-13793][CORE] PipedRDD doesn't propagate exceptions while reading parent RDD
## What changes were proposed in this pull request? PipedRDD creates a child thread to read output of the parent stage and feed it to the pipe process. Used a variable to save the exception thrown in the child thread and then propagating the exception in the main thread if the variable was set. ## How was this patch tested? - Added a unit test - Ran all the existing tests in PipedRDDSuite and they all pass with the change - Tested the patch with a real pipe() job, bounced the executor node which ran the parent stage to simulate a fetch failure and observed that the parent stage was re-ran. Author: Tejas Patil <tejasp@fb.com> Closes #11628 from tejasapatil/pipe_rdd.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala21
1 files changed, 21 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 1eebc924a5..d13da38ed0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -50,6 +50,27 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("failure in iterating over pipe input") {
+ if (testCommandAvailable("cat")) {
+ val nums =
+ sc.makeRDD(Array(1, 2, 3, 4), 2)
+ .mapPartitionsWithIndex((index, iterator) => {
+ new Iterator[Int] {
+ def hasNext = true
+ def next() = {
+ throw new SparkException("Exception to simulate bad scenario")
+ }
+ }
+ })
+
+ val piped = nums.pipe(Seq("cat"))
+
+ intercept[SparkException] {
+ piped.collect()
+ }
+ }
+ }
+
test("advanced pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)