aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-08-17 10:34:22 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-17 10:34:22 -0700
commitf68d024096c90936f9aa4e325141b39f08c72476 (patch)
tree0bf3f03b346f71d1fccf9775e37ded8d7c88bf0e /core
parented092a06c28dfa8204b473a5c964a9ef9a6b655e (diff)
downloadspark-f68d024096c90936f9aa4e325141b39f08c72476.tar.gz
spark-f68d024096c90936f9aa4e325141b39f08c72476.tar.bz2
spark-f68d024096c90936f9aa4e325141b39f08c72476.zip
[SPARK-7736] [CORE] [YARN] Make pyspark fail YARN app on failure.
The YARN backend doesn't like when user code calls `System.exit`, since it cannot know the exit status and thus cannot set an appropriate final status for the application. So, for pyspark, avoid that call and instead throw an exception with the exit code. SparkSubmit handles that exception and exits with the given exit code, while YARN uses the exit code as the failure code for the Spark app. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7751 from vanzin/SPARK-9416.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkException.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala10
3 files changed, 34 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 2ebd7a7151..977a27bdfe 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)
+
+/**
+ * Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
+ * the parent SparkSubmit process to exit with the same exit code.
+ */
+private[spark] case class SparkUserAppException(exitCode: Int)
+ extends SparkException(s"User application exited with $exitCode")
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index c2ed43a539..4277ac2ad1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try
+import org.apache.spark.SparkUserAppException
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
@@ -46,7 +47,14 @@ object PythonRunner {
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
- gatewayServer.start()
+ val thread = new Thread(new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ gatewayServer.start()
+ }
+ })
+ thread.setName("py4j-gateway")
+ thread.setDaemon(true)
+ thread.start()
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
@@ -64,11 +72,18 @@ object PythonRunner {
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
- val process = builder.start()
+ try {
+ val process = builder.start()
- new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+ new RedirectThread(process.getInputStream, System.out, "redirect output").start()
- System.exit(process.waitFor())
+ val exitCode = process.waitFor()
+ if (exitCode != 0) {
+ throw new SparkUserAppException(exitCode)
+ }
+ } finally {
+ gatewayServer.shutdown()
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 02fa3088ed..86fcf942c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
+import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -672,7 +672,13 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
- throw findCause(t)
+ findCause(t) match {
+ case SparkUserAppException(exitCode) =>
+ System.exit(exitCode)
+
+ case t: Throwable =>
+ throw t
+ }
}
}