aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala29
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala2
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala12
-rw-r--r--yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala24
5 files changed, 44 insertions, 35 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5c7bca4541..9c66c78584 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -137,15 +137,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- try {
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- } catch {
- case e: Exception =>
- Console.err.println(e.getMessage)
- System.exit(1)
- }
-
- System.exit(0)
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 0efac4ea63..fb0e34bf59 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -417,17 +417,19 @@ private[spark] trait ClientBase extends Logging {
/**
* Report the state of an application until it has exited, either successfully or
- * due to some failure, then return the application state.
+ * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
+ * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
+ * or KILLED).
*
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
- * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
+ * @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
- logApplicationReport: Boolean = true): YarnApplicationState = {
+ logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
var lastState: YarnApplicationState = null
while (true) {
@@ -468,11 +470,11 @@ private[spark] trait ClientBase extends Logging {
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
- return state
+ return (state, report.getFinalApplicationStatus)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
- return state
+ return (state, report.getFinalApplicationStatus)
}
lastState = state
@@ -485,8 +487,23 @@ private[spark] trait ClientBase extends Logging {
/**
* Submit an application to the ResourceManager and monitor its state.
* This continues until the application has exited for any reason.
+ * If the application finishes with a failed, killed, or undefined status,
+ * throw an appropriate SparkException.
*/
- def run(): Unit = monitorApplication(submitApplication())
+ def run(): Unit = {
+ val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
+ if (yarnApplicationState == YarnApplicationState.FAILED ||
+ finalApplicationStatus == FinalApplicationStatus.FAILED) {
+ throw new SparkException("Application finished with failed status")
+ }
+ if (yarnApplicationState == YarnApplicationState.KILLED ||
+ finalApplicationStatus == FinalApplicationStatus.KILLED) {
+ throw new SparkException("Application is killed")
+ }
+ if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+ throw new SparkException("The final status of application is undefined")
+ }
+ }
/* --------------------------------------------------------------------------------------- *
| Methods that cannot be implemented here due to API differences across hadoop versions |
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6bb4b82316..d948a2aeed 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
private def waitForApplication(): Unit = {
assert(client != null && appId != null, "Application has not been submitted yet!")
- val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
+ val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0b43e6ee20..addaddb711 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -135,15 +135,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
- try {
- val args = new ClientArguments(argStrings, sparkConf)
- new Client(args, sparkConf).run()
- } catch {
- case e: Exception =>
- Console.err.println(e.getMessage)
- System.exit(1)
- }
-
- System.exit(0)
+ val args = new ClientArguments(argStrings, sparkConf)
+ new Client(args, sparkConf).run()
}
}
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a826b2a78a..d79b85e867 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -123,21 +123,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)
- // The Client object will call System.exit() after the job is done, and we don't want
- // that because it messes up the scalatest monitoring. So replicate some of what main()
- // does here.
val args = Array("--class", main,
"--jar", "file:" + fakeSparkJar.getAbsolutePath(),
"--arg", "yarn-cluster",
"--arg", result.getAbsolutePath(),
"--num-executors", "1")
- val sparkConf = new SparkConf()
- val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
- val clientArgs = new ClientArguments(args, sparkConf)
- new Client(clientArgs, yarnConf, sparkConf).run()
+ Client.main(args)
checkResult(result)
}
+ test("run Spark in yarn-cluster mode unsuccessfully") {
+ val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+
+ // Use only one argument so the driver will fail
+ val args = Array("--class", main,
+ "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+ "--arg", "yarn-cluster",
+ "--num-executors", "1")
+ val exception = intercept[SparkException] {
+ Client.main(args)
+ }
+ assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
+ }
+
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So