aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-07-24 14:46:10 -0500
committerThomas Graves <tgraves@apache.org>2014-07-24 14:46:10 -0500
commit323a83c5235f9289cd9526491d62365df96a429b (patch)
tree159b8b0f8cd00bfca3ddca0dfd672198a33534d4 /yarn/stable
parentc960b5051853f336fb01ea3f16567b9958baa1b6 (diff)
downloadspark-323a83c5235f9289cd9526491d62365df96a429b.tar.gz
spark-323a83c5235f9289cd9526491d62365df96a429b.tar.bz2
spark-323a83c5235f9289cd9526491d62365df96a429b.zip
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures
Author: GuoQiang Li <witgo@qq.com> Closes #1180 from witgo/SPARK-2037 and squashes the following commits: 3d52411 [GuoQiang Li] review commit 7058f4d [GuoQiang Li] Correctly stop SparkContext 6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala45
1 files changed, 35 insertions, 10 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 7158d9442a..fc7b8320d7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn
import java.net.Socket
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
-import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed:Boolean = false
+ private var driverClosed: Boolean = false
+ private var isFinished: Boolean = false
+ private var registered: Boolean = false
private var amClient: AMRMClient[ContainerRequest] = _
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
@@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
amClient.start()
appAttemptId = ApplicationMaster.getApplicationAttemptId()
- registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
waitForSparkMaster()
addAmIpFilter()
@@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
@@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
+ }
+
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
while (!driverClosed) {
+ checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
yarnAllocator.allocateResources()
@@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
t
}
- def finishApplicationMaster(status: FinalApplicationStatus) {
- logInfo("Unregistering ApplicationMaster with " + status)
- val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
- amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
+ def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ logInfo("Unregistering ApplicationMaster with " + status)
+ if (registered) {
+ val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+ amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
+ }
+ isFinished = true
+ }
}
}