aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorpeng.zhang <peng.zhang@xiaomi.com>2016-07-01 15:51:21 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-01 15:51:21 -0700
commitbad0f7dbba2eda149ee4fc5810674d971d17874a (patch)
tree03aa741f1ed2e98333f8a7332d801df23b1c7b63 /yarn
parentd17e5f2f123eecd5a7a1d87f5ce75a0fc44552b4 (diff)
downloadspark-bad0f7dbba2eda149ee4fc5810674d971d17874a.tar.gz
spark-bad0f7dbba2eda149ee4fc5810674d971d17874a.tar.bz2
spark-bad0f7dbba2eda149ee4fc5810674d971d17874a.zip
[SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher
## What changes were proposed in this pull request? Yarn cluster mode should return correct state for SparkLauncher ## How was this patch tested? unit test Author: peng.zhang <peng.zhang@xiaomi.com> Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala9
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala37
2 files changed, 31 insertions, 15 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d63579ff82..244d1a4e33 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1080,7 +1080,14 @@ private[spark] class Client(
case YarnApplicationState.RUNNING =>
reportLauncherState(SparkAppHandle.State.RUNNING)
case YarnApplicationState.FINISHED =>
- reportLauncherState(SparkAppHandle.State.FINISHED)
+ report.getFinalApplicationStatus match {
+ case FinalApplicationStatus.FAILED =>
+ reportLauncherState(SparkAppHandle.State.FAILED)
+ case FinalApplicationStatus.KILLED =>
+ reportLauncherState(SparkAppHandle.State.KILLED)
+ case _ =>
+ reportLauncherState(SparkAppHandle.State.FINISHED)
+ }
case YarnApplicationState.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case YarnApplicationState.KILLED =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 6b20dea590..9085fca1d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
finalState should be (SparkAppHandle.State.FAILED)
}
+ test("run Spark in yarn-cluster mode failure after sc initialized") {
+ val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass))
+ finalState should be (SparkAppHandle.State.FAILED)
+ }
+
test("run Python application in yarn-client mode") {
testPySpark(true)
}
@@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener {
}
}
+private object YarnClusterDriverWithFailure extends Logging with Matchers {
+ def main(args: Array[String]): Unit = {
+ val sc = new SparkContext(new SparkConf()
+ .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
+ .setAppName("yarn test with failure"))
+
+ throw new Exception("exception after sc initialized")
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
@@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers {
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be (Set(1, 2, 3, 4))
result = "success"
+
+ // Verify that the config archive is correctly placed in the classpath of all containers.
+ val confFile = "/" + Client.SPARK_CONF_FILE
+ assert(getClass().getResource(confFile) != null)
+ val configFromExecutors = sc.parallelize(1 to 4, 4)
+ .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
+ .collect()
+ assert(configFromExecutors.find(_ == null) === None)
} finally {
Files.write(result, status, StandardCharsets.UTF_8)
sc.stop()
}
- // Verify that the config archive is correctly placed in the classpath of all containers.
- val confFile = "/" + Client.SPARK_CONF_FILE
- assert(getClass().getResource(confFile) != null)
- val configFromExecutors = sc.parallelize(1 to 4, 4)
- .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
- .collect()
- assert(configFromExecutors.find(_ == null) === None)
-
// verify log urls are present
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
@@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers {
}
private object YarnClasspathTest extends Logging {
-
- var exitCode = 0
-
def error(m: String, ex: Throwable = null): Unit = {
logError(m, ex)
// scalastyle:off println
@@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging {
} finally {
sc.stop()
}
- System.exit(exitCode)
}
private def readResource(resultPath: String): Unit = {
@@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging {
} catch {
case t: Throwable =>
error(s"loading test.resource to $resultPath", t)
- // set the exit code if not yet set
- exitCode = 2
} finally {
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
}