diff options
author | peng.zhang <peng.zhang@xiaomi.com> | 2016-07-01 15:51:21 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-07-01 15:51:21 -0700 |
commit | bad0f7dbba2eda149ee4fc5810674d971d17874a (patch) | |
tree | 03aa741f1ed2e98333f8a7332d801df23b1c7b63 /yarn/src/test | |
parent | d17e5f2f123eecd5a7a1d87f5ce75a0fc44552b4 (diff) | |
download | spark-bad0f7dbba2eda149ee4fc5810674d971d17874a.tar.gz spark-bad0f7dbba2eda149ee4fc5810674d971d17874a.tar.bz2 spark-bad0f7dbba2eda149ee4fc5810674d971d17874a.zip |
[SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher
## What changes were proposed in this pull request?
Yarn cluster mode should return correct state for SparkLauncher
## How was this patch tested?
unit test
Author: peng.zhang <peng.zhang@xiaomi.com>
Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.
Diffstat (limited to 'yarn/src/test')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6b20dea590..9085fca1d3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } + test("run Spark in yarn-cluster mode failure after sc initialized") { + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass)) + finalState should be (SparkAppHandle.State.FAILED) + } + test("run Python application in yarn-client mode") { testPySpark(true) } @@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener { } } +private object YarnClusterDriverWithFailure extends Logging with Matchers { + def main(args: Array[String]): Unit = { + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn test with failure")) + + throw new Exception("exception after sc initialized") + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 @@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers { sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) data should be (Set(1, 2, 3, 4)) result = "success" + + // Verify that the config archive is correctly placed in the classpath of all containers. + val confFile = "/" + Client.SPARK_CONF_FILE + assert(getClass().getResource(confFile) != null) + val configFromExecutors = sc.parallelize(1 to 4, 4) + .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } + .collect() + assert(configFromExecutors.find(_ == null) === None) } finally { Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } - // Verify that the config archive is correctly placed in the classpath of all containers. - val confFile = "/" + Client.SPARK_CONF_FILE - assert(getClass().getResource(confFile) != null) - val configFromExecutors = sc.parallelize(1 to 4, 4) - .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } - .collect() - assert(configFromExecutors.find(_ == null) === None) - // verify log urls are present val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) @@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers { } private object YarnClasspathTest extends Logging { - - var exitCode = 0 - def error(m: String, ex: Throwable = null): Unit = { logError(m, ex) // scalastyle:off println @@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging { } finally { sc.stop() } - System.exit(exitCode) } private def readResource(resultPath: String): Unit = { @@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging { } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) - // set the exit code if not yet set - exitCode = 2 } finally { Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } |