diff options
author | Tejas Patil <tejasp@fb.com> | 2016-03-24 00:31:13 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-24 00:31:13 -0700 |
commit | 01849da080439d1f2dbb90a8985c661522ed3d7a (patch) | |
tree | 9591413e38ade17e65658599a9bbad8b27c98829 /core/src/main/scala | |
parent | c44d140cae99d0b880e6d25f158125ad3adc6a05 (diff) | |
download | spark-01849da080439d1f2dbb90a8985c661522ed3d7a.tar.gz spark-01849da080439d1f2dbb90a8985c661522ed3d7a.tar.bz2 spark-01849da080439d1f2dbb90a8985c661522ed3d7a.zip |
[SPARK-14110][CORE] PipedRDD to print the command ran on non zero exit
## What changes were proposed in this pull request?
In case of failure in subprocess launched in PipedRDD, the failure exception reads “Subprocess exited with status XXX”. Debugging this is not easy for users especially if there are multiple pipe() operations in the Spark application.
Changes done:
- Changed the exception message when non-zero exit code is seen
- If the reader and writer threads see exception, simply logging the command ran. The current model is to propagate the exception "as is" so that upstream Spark logic will take the right action based on what the exception was (eg. for fetch failure, it needs to retry; but for some fatal exception, it will decide to fail the stage / job). So wrapping the exception with a generic exception will not work. Altering the exception message will keep that guarantee but that is ugly (plus not all exceptions might have a constructor for a string message)
## How was this patch tested?
- Added a new test case
- Ran all existing tests for PipedRDD
Author: Tejas Patil <tejasp@fb.com>
Closes #11927 from tejasapatil/SPARK-14110-piperdd-failure.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 50b4184e48..dd8e46ba0f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -184,7 +184,8 @@ private[spark] class PipedRDD[T: ClassTag]( val exitStatus = proc.waitFor() cleanup() if (exitStatus != 0) { - throw new IllegalStateException(s"Subprocess exited with status $exitStatus") + throw new IllegalStateException(s"Subprocess exited with status $exitStatus. " + + s"Command ran: " + command.mkString(" ")) } false } @@ -205,6 +206,9 @@ private[spark] class PipedRDD[T: ClassTag]( private def propagateChildException(): Unit = { val t = childThreadException.get() if (t != null) { + val commandRan = command.mkString(" ") + logError(s"Caught exception while running pipe() operator. Command ran: $commandRan. " + + s"Exception: ${t.getMessage}") proc.destroy() cleanup() throw t |