From bad0f7dbba2eda149ee4fc5810674d971d17874a Mon Sep 17 00:00:00 2001 From: "peng.zhang" Date: Fri, 1 Jul 2016 15:51:21 -0700 Subject: [SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher ## What changes were proposed in this pull request? Yarn cluster mode should return correct state for SparkLauncher ## How was this patch tested? unit test Author: peng.zhang Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state. --- .../spark/deploy/yarn/YarnClusterSuite.scala | 37 ++++++++++++++-------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'yarn/src/test/scala/org') diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6b20dea590..9085fca1d3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } + test("run Spark in yarn-cluster mode failure after sc initialized") { + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass)) + finalState should be (SparkAppHandle.State.FAILED) + } + test("run Python application in yarn-client mode") { testPySpark(true) } @@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener { } } +private object YarnClusterDriverWithFailure extends Logging with Matchers { + def main(args: Array[String]): Unit = { + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn test with failure")) + + throw new Exception("exception after sc initialized") + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 @@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers { sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) data should be (Set(1, 2, 3, 4)) result = "success" + + // Verify that the config archive is correctly placed in the classpath of all containers. + val confFile = "/" + Client.SPARK_CONF_FILE + assert(getClass().getResource(confFile) != null) + val configFromExecutors = sc.parallelize(1 to 4, 4) + .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } + .collect() + assert(configFromExecutors.find(_ == null) === None) } finally { Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } - // Verify that the config archive is correctly placed in the classpath of all containers. - val confFile = "/" + Client.SPARK_CONF_FILE - assert(getClass().getResource(confFile) != null) - val configFromExecutors = sc.parallelize(1 to 4, 4) - .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } - .collect() - assert(configFromExecutors.find(_ == null) === None) - // verify log urls are present val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) @@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers { } private object YarnClasspathTest extends Logging { - - var exitCode = 0 - def error(m: String, ex: Throwable = null): Unit = { logError(m, ex) // scalastyle:off println @@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging { } finally { sc.stop() } - System.exit(exitCode) } private def readResource(resultPath: String): Unit = { @@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging { } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) - // set the exit code if not yet set - exitCode = 2 } finally { Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } -- cgit v1.2.3