aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-07-24 14:46:10 -0500
committerThomas Graves <tgraves@apache.org>2014-07-24 14:46:10 -0500
commit323a83c5235f9289cd9526491d62365df96a429b (patch)
tree159b8b0f8cd00bfca3ddca0dfd672198a33534d4 /yarn/alpha
parentc960b5051853f336fb01ea3f16567b9958baa1b6 (diff)
downloadspark-323a83c5235f9289cd9526491d62365df96a429b.tar.gz
spark-323a83c5235f9289cd9526491d62365df96a429b.tar.bz2
spark-323a83c5235f9289cd9526491d62365df96a429b.zip
[SPARK-2037]: yarn client mode doesn't support spark.yarn.max.executor.failures
Author: GuoQiang Li <witgo@qq.com> Closes #1180 from witgo/SPARK-2037 and squashes the following commits: 3d52411 [GuoQiang Li] review commit 7058f4d [GuoQiang Li] Correctly stop SparkContext 6d0561f [GuoQiang Li] Fix: yarn client mode doesn't support spark.yarn.max.executor.failures
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala80
1 files changed, 52 insertions, 28 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d232c18d2f..184e2ad6c8 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
-import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed:Boolean = false
+
+ private var driverClosed: Boolean = false
+ private var isFinished: Boolean = false
+ private var registered: Boolean = false
+
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
val securityManager = new SecurityManager(sparkConf)
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
@@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
+ synchronized {
+ if (!isFinished) {
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ // Compute number of threads for akka
+ val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+ if (minimumMemory > 0) {
+ val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ }
+ }
+ registered = true
}
}
-
waitForSparkMaster()
addAmIpFilter()
// Allocate all containers
@@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+ checkNumExecutorsFailed()
Thread.sleep(100)
}
logInfo("All executors have launched.")
-
+ }
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
}
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
+ checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount +
@@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.allocateContainers(0)
}
- def finishApplicationMaster(status: FinalApplicationStatus) {
-
- logInfo("finish ApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
- resourceManager.finishApplicationMaster(finishReq)
+ def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ logInfo("Unregistering ApplicationMaster with " + status)
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+ finishReq.setDiagnostics(appMessage)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+ isFinished = true
+ }
}
}