aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-07-24 14:46:10 -0500
committerThomas Graves <tgraves@apache.org>2014-07-24 14:46:10 -0500
commit323a83c5235f9289cd9526491d62365df96a429b (patch)
tree159b8b0f8cd00bfca3ddca0dfd672198a33534d4 /yarn/common
parentc960b5051853f336fb01ea3f16567b9958baa1b6 (diff)
downloadspark-323a83c5235f9289cd9526491d62365df96a429b.tar.gz
spark-323a83c5235f9289cd9526491d62365df96a429b.tar.bz2
spark-323a83c5235f9289cd9526491d62365df96a429b.zip
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures
Author: GuoQiang Li <witgo@qq.com> Closes #1180 from witgo/SPARK-2037 and squashes the following commits: 3d52411 [GuoQiang Li] review commit 7058f4d [GuoQiang Li] Correctly stop SparkContext 6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala28
1 files changed, 28 insertions, 0 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 77b91f8e26..f8fb96b312 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
+ var checkerThread: Thread = null
+ var stopping: Boolean = false
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
@@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
+ checkerThread = yarnApplicationStateCheckerThread()
}
def waitForApp() {
@@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend(
}
}
+ private def yarnApplicationStateCheckerThread(): Thread = {
+ val t = new Thread {
+ override def run() {
+ while (!stopping) {
+ val report = client.getApplicationReport(appId)
+ val state = report.getYarnApplicationState()
+ if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
+ || state == YarnApplicationState.FAILED) {
+ logError(s"Yarn application already ended: $state")
+ sc.stop()
+ stopping = true
+ }
+ Thread.sleep(1000L)
+ }
+ checkerThread = null
+ Thread.currentThread().interrupt()
+ }
+ }
+ t.setName("Yarn Application State Checker")
+ t.setDaemon(true)
+ t.start()
+ t
+ }
+
override def stop() {
+ stopping = true
super.stop()
client.stop
logInfo("Stopped")