diff options
author | GuoQiang Li <witgo@qq.com> | 2014-07-24 14:46:10 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-07-24 14:46:10 -0500 |
commit | 323a83c5235f9289cd9526491d62365df96a429b (patch) | |
tree | 159b8b0f8cd00bfca3ddca0dfd672198a33534d4 /yarn/common | |
parent | c960b5051853f336fb01ea3f16567b9958baa1b6 (diff) | |
download | spark-323a83c5235f9289cd9526491d62365df96a429b.tar.gz spark-323a83c5235f9289cd9526491d62365df96a429b.tar.bz2 spark-323a83c5235f9289cd9526491d62365df96a429b.zip |
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures
Author: GuoQiang Li <witgo@qq.com>
Closes #1180 from witgo/SPARK-2037 and squashes the following commits:
3d52411 [GuoQiang Li] review commit
7058f4d [GuoQiang Li] Correctly stop SparkContext
6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
Diffstat (limited to 'yarn/common')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 28 |
1 files changed, 28 insertions, 0 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 77b91f8e26..f8fb96b312 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + var checkerThread: Thread = null + var stopping: Boolean = false private[spark] def addArg(optionName: String, envVar: String, sysProp: String, arrayBuf: ArrayBuffer[String]) { @@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) appId = client.runApp() waitForApp() + checkerThread = yarnApplicationStateCheckerThread() } def waitForApp() { @@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend( } } + private def yarnApplicationStateCheckerThread(): Thread = { + val t = new Thread { + override def run() { + while (!stopping) { + val report = client.getApplicationReport(appId) + val state = report.getYarnApplicationState() + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED + || state == YarnApplicationState.FAILED) { + logError(s"Yarn application already ended: $state") + sc.stop() + stopping = true + } + Thread.sleep(1000L) + } + checkerThread = null + Thread.currentThread().interrupt() + } + } + t.setName("Yarn Application State Checker") + t.setDaemon(true) + t.start() + t + } + override def stop() { + stopping = true super.stop() client.stop logInfo("Stopped") |