From 323a83c5235f9289cd9526491d62365df96a429b Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Thu, 24 Jul 2014 14:46:10 -0500 Subject: [SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures Author: GuoQiang Li Closes #1180 from witgo/SPARK-2037 and squashes the following commits: 3d52411 [GuoQiang Li] review commit 7058f4d [GuoQiang Li] Correctly stop SparkContext 6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures --- .../cluster/YarnClientSchedulerBackend.scala | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'yarn/common') diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 77b91f8e26..f8fb96b312 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + var checkerThread: Thread = null + var stopping: Boolean = false private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { @@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) appId = client.runApp() waitForApp() + checkerThread = yarnApplicationStateCheckerThread() } def waitForApp() { @@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend( } } + private def yarnApplicationStateCheckerThread(): Thread = { + val t = new Thread { + override def run() { + while (!stopping) { + val report = client.getApplicationReport(appId) + val state = report.getYarnApplicationState() + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED + || state == YarnApplicationState.FAILED) { + logError(s"Yarn application already ended: $state") + sc.stop() + stopping = true + } + Thread.sleep(1000L) + } + checkerThread = null + Thread.currentThread().interrupt() + } + } + t.setName("Yarn Application State Checker") + t.setDaemon(true) + t.start() + t + } + override def stop() { + stopping = true super.stop() client.stop logInfo("Stopped") -- cgit v1.2.3