aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkException.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala10
3 files changed, 34 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 2ebd7a7151..977a27bdfe 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)
+
+/**
+ * Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
+ * the parent SparkSubmit process to exit with the same exit code.
+ */
+private[spark] case class SparkUserAppException(exitCode: Int)
+ extends SparkException(s"User application exited with $exitCode")
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index c2ed43a539..4277ac2ad1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try
+import org.apache.spark.SparkUserAppException
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
@@ -46,7 +47,14 @@ object PythonRunner {
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
- gatewayServer.start()
+ val thread = new Thread(new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ gatewayServer.start()
+ }
+ })
+ thread.setName("py4j-gateway")
+ thread.setDaemon(true)
+ thread.start()
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
@@ -64,11 +72,18 @@ object PythonRunner {
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
- val process = builder.start()
+ try {
+ val process = builder.start()
- new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+ new RedirectThread(process.getInputStream, System.out, "redirect output").start()
- System.exit(process.waitFor())
+ val exitCode = process.waitFor()
+ if (exitCode != 0) {
+ throw new SparkUserAppException(exitCode)
+ }
+ } finally {
+ gatewayServer.shutdown()
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 02fa3088ed..86fcf942c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
+import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -672,7 +672,13 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
- throw findCause(t)
+ findCause(t) match {
+ case SparkUserAppException(exitCode) =>
+ System.exit(exitCode)
+
+ case t: Throwable =>
+ throw t
+ }
}
}