aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-03-16 09:58:53 +0000
committerSean Owen <sowen@cloudera.com>2016-03-16 09:58:53 +0000
commit1d95fb6785dd77d879d3d60e15320f72ab185fd3 (patch)
tree8408b60872b6266b93fe0ce02945154e80a25426
parent56d88247f14ca54750816748f5b6b2aca7bc6fea (diff)
downloadspark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.gz
spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.tar.bz2
spark-1d95fb6785dd77d879d3d60e15320f72ab185fd3.zip
[SPARK-13793][CORE] PipedRDD doesn't propagate exceptions while reading parent RDD
## What changes were proposed in this pull request? PipedRDD creates a child thread to read output of the parent stage and feed it to the pipe process. Used a variable to save the exception thrown in the child thread and then propagating the exception in the main thread if the variable was set. ## How was this patch tested? - Added a unit test - Ran all the existing tests in PipedRDDSuite and they all pass with the change - Tested the patch with a real pipe() job, bounced the executor node which ran the parent stage to simulate a fetch failure and observed that the parent stage was re-ran. Author: Tejas Patil <tejasp@fb.com> Closes #11628 from tejasapatil/pipe_rdd.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala97
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala21
2 files changed, 86 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index afbe566b76..50b4184e48 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -22,12 +22,14 @@ import java.io.FilenameFilter
import java.io.IOException
import java.io.PrintWriter
import java.util.StringTokenizer
+import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.util.Utils
@@ -118,63 +120,94 @@ private[spark] class PipedRDD[T: ClassTag](
val proc = pb.start()
val env = SparkEnv.get
+ val childThreadException = new AtomicReference[Throwable](null)
// Start a thread to print the process's stderr to ours
- new Thread("stderr reader for " + command) {
- override def run() {
- for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
- // scalastyle:off println
- System.err.println(line)
- // scalastyle:on println
+ new Thread(s"stderr reader for $command") {
+ override def run(): Unit = {
+ val err = proc.getErrorStream
+ try {
+ for (line <- Source.fromInputStream(err).getLines) {
+ // scalastyle:off println
+ System.err.println(line)
+ // scalastyle:on println
+ }
+ } catch {
+ case t: Throwable => childThreadException.set(t)
+ } finally {
+ err.close()
}
}
}.start()
// Start a thread to feed the process input from our parent's iterator
- new Thread("stdin writer for " + command) {
- override def run() {
+ new Thread(s"stdin writer for $command") {
+ override def run(): Unit = {
TaskContext.setTaskContext(context)
val out = new PrintWriter(proc.getOutputStream)
-
- // scalastyle:off println
- // input the pipe context firstly
- if (printPipeContext != null) {
- printPipeContext(out.println(_))
- }
- for (elem <- firstParent[T].iterator(split, context)) {
- if (printRDDElement != null) {
- printRDDElement(elem, out.println(_))
- } else {
- out.println(elem)
+ try {
+ // scalastyle:off println
+ // input the pipe context firstly
+ if (printPipeContext != null) {
+ printPipeContext(out.println)
+ }
+ for (elem <- firstParent[T].iterator(split, context)) {
+ if (printRDDElement != null) {
+ printRDDElement(elem, out.println)
+ } else {
+ out.println(elem)
+ }
}
+ // scalastyle:on println
+ } catch {
+ case t: Throwable => childThreadException.set(t)
+ } finally {
+ out.close()
}
- // scalastyle:on println
- out.close()
}
}.start()
// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines()
new Iterator[String] {
- def next(): String = lines.next()
- def hasNext: Boolean = {
- if (lines.hasNext) {
+ def next(): String = {
+ if (!hasNext()) {
+ throw new NoSuchElementException()
+ }
+ lines.next()
+ }
+
+ def hasNext(): Boolean = {
+ val result = if (lines.hasNext) {
true
} else {
val exitStatus = proc.waitFor()
+ cleanup()
if (exitStatus != 0) {
- throw new Exception("Subprocess exited with status " + exitStatus)
+ throw new IllegalStateException(s"Subprocess exited with status $exitStatus")
}
+ false
+ }
+ propagateChildException()
+ result
+ }
- // cleanup task working directory if used
- if (workInTaskDirectory) {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- Utils.deleteRecursively(new File(taskDirectory))
- }
- logDebug("Removed task working directory " + taskDirectory)
+ private def cleanup(): Unit = {
+ // cleanup task working directory if used
+ if (workInTaskDirectory) {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ Utils.deleteRecursively(new File(taskDirectory))
}
+ logDebug(s"Removed task working directory $taskDirectory")
+ }
+ }
- false
+ private def propagateChildException(): Unit = {
+ val t = childThreadException.get()
+ if (t != null) {
+ proc.destroy()
+ cleanup()
+ throw t
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 1eebc924a5..d13da38ed0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -50,6 +50,27 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("failure in iterating over pipe input") {
+ if (testCommandAvailable("cat")) {
+ val nums =
+ sc.makeRDD(Array(1, 2, 3, 4), 2)
+ .mapPartitionsWithIndex((index, iterator) => {
+ new Iterator[Int] {
+ def hasNext = true
+ def next() = {
+ throw new SparkException("Exception to simulate bad scenario")
+ }
+ }
+ })
+
+ val piped = nums.pipe(Seq("cat"))
+
+ intercept[SparkException] {
+ piped.collect()
+ }
+ }
+ }
+
test("advanced pipe") {
if (testCommandAvailable("cat")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)