aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-10-22 15:04:41 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-22 15:04:41 -0700
commit137d94235383cc49ccf8a7bb7f314f578aa1dede (patch)
tree650feb18ef7afd0a2bbe3d03bcacc834f8428647 /yarn
parent813effc701fc27121c6f23ab32882932016fdbe0 (diff)
downloadspark-137d94235383cc49ccf8a7bb7f314f578aa1dede.tar.gz
spark-137d94235383cc49ccf8a7bb7f314f578aa1dede.tar.bz2
spark-137d94235383cc49ccf8a7bb7f314f578aa1dede.zip
[SPARK-3877][YARN] Throw an exception when application is not successful so that the exit code wil be set to 1
When an yarn application fails (yarn-cluster mode), the exit code of spark-submit is still 0. It's hard for people to write some automatic scripts to run spark jobs in yarn because the failure can not be detected in these scripts. This PR added a status checking after `monitorApplication`. If an application is not successful, `run()` will throw an `SparkException`, so that Client.scala will exit with code 1. Therefore, people can use the exit code of `spark-submit` to write some automatic scripts. Author: zsxwing <zsxwing@gmail.com> Closes #2732 from zsxwing/SPARK-3877 and squashes the following commits: 1f89fa5 [zsxwing] Fix the unit test a0498e1 [zsxwing] Update the docs and the error message e1cb9ef [zsxwing] Fix the hacky way of calling Client ff16fec [zsxwing] Remove System.exit in Client.scala and add a test 6a2c103 [zsxwing] [SPARK-3877] Throw an exception when application is not successful so that the exit code wil be set to 1
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala29
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala12
-rw-r--r--yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala24
5 files changed, 44 insertions, 35 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5c7bca4541..9c66c78584 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -137,15 +137,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- try {
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- } catch {
- case e: Exception =>
- Console.err.println(e.getMessage)
- System.exit(1)
- }
-
- System.exit(0)
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 0efac4ea63..fb0e34bf59 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging {
/**
* Report the state of an application until it has exited, either successfully or
- * due to some failure, then return the application state.
+ * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
+ * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
+ * or KILLED).
*
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
- * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
+ * @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
- logApplicationReport: Boolean = true): YarnApplicationState = {
+ logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
var lastState: YarnApplicationState = null
while (true) {
@@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging {
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
- return state
+ return (state, report.getFinalApplicationStatus)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
- return state
+ return (state, report.getFinalApplicationStatus)
}
lastState = state
@@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging {
/**
* Submit an application to the ResourceManager and monitor its state.
* This continues until the application has exited for any reason.
+ * If the application finishes with a failed, killed, or undefined status,
+ * throw an appropriate SparkException.
*/
- def run(): Unit = monitorApplication(submitApplication())
+ def run(): Unit = {
+ val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
+ if (yarnApplicationState == YarnApplicationState.FAILED ||
+ finalApplicationStatus == FinalApplicationStatus.FAILED) {
+ throw new SparkException("Application finished with failed status")
+ }
+ if (yarnApplicationState == YarnApplicationState.KILLED ||
+ finalApplicationStatus == FinalApplicationStatus.KILLED) {
+ throw new SparkException("Application is killed")
+ }
+ if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+ throw new SparkException("The final status of application is undefined")
+ }
+ }
/* --------------------------------------------------------------------------------------- *
| Methods that cannot be implemented here due to API differences across hadoop versions |
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6bb4b82316..d948a2aeed 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
private def waitForApplication(): Unit = {
assert(client != null && appId != null, "Application has not been submitted yet!")
- val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
+ val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0b43e6ee20..addaddb711 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -135,15 +135,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- try {
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- } catch {
- case e: Exception =>
- Console.err.println(e.getMessage)
- System.exit(1)
- }
-
- System.exit(0)
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
}
}
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a826b2a78a..d79b85e867 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)
- // The Client object will call System.exit() after the job is done, and we don't want
- // that because it messes up the scalatest monitoring. So replicate some of what main()
- // does here.
val args = Array("--class", main,
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
"--arg", "yarn-cluster",
"--arg", result.getAbsolutePath(),
"--num-executors", "1")
- val sparkConf = new SparkConf()
- val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
- val clientArgs = new ClientArguments(args, sparkConf)
- new Client(clientArgs, yarnConf, sparkConf).run()
+ Client.main(args)
checkResult(result)
}
+ test("run Spark in yarn-cluster mode unsuccessfully") {
+ val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+ // Use only one argument so the driver will fail
+ val args = Array("--class", main,
+ "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--num-executors", "1")
+ val exception = intercept[SparkException] {
+ Client.main(args)
+ }
+ assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So