aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala80
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala28
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala45
3 files changed, 115 insertions, 38 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d232c18d2f..184e2ad6c8 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
-import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +56,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed:Boolean = false
+
+ private var driverClosed: Boolean = false
+ private var isFinished: Boolean = false
+ private var registered: Boolean = false
+
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
val securityManager = new SecurityManager(sparkConf)
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
var actor: ActorRef = _
@@ -97,23 +103,26 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnAllocationHandler.MEMORY_OVERHEAD)
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
+ synchronized {
+ if (!isFinished) {
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ // Compute number of threads for akka
+ val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+
+ if (minimumMemory > 0) {
+ val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnAllocationHandler.MEMORY_OVERHEAD)
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+
+ if (numCore > 0) {
+ // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
+ // args.workerCores = numCore
+ }
+ }
+ registered = true
}
}
-
waitForSparkMaster()
addAmIpFilter()
// Allocate all containers
@@ -243,11 +252,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+ checkNumExecutorsFailed()
Thread.sleep(100)
}
logInfo("All executors have launched.")
-
+ }
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
}
// TODO: We might want to extend this to allocate more containers in case they die !
@@ -257,6 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
+ checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating " + missingExecutorCount +
@@ -282,15 +298,23 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.allocateContainers(0)
}
- def finishApplicationMaster(status: FinalApplicationStatus) {
-
- logInfo("finish ApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- finishReq.setTrackingUrl(sparkConf.get("spark.driver.appUIHistoryAddress", ""))
- resourceManager.finishApplicationMaster(finishReq)
+ def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ logInfo("Unregistering ApplicationMaster with " + status)
+ if (registered) {
+ val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
+ .asInstanceOf[FinishApplicationMasterRequest]
+ finishReq.setAppAttemptId(appAttemptId)
+ finishReq.setFinishApplicationStatus(status)
+ finishReq.setTrackingUrl(sparkConf.get("spark.yarn.historyServer.address", ""))
+ finishReq.setDiagnostics(appMessage)
+ resourceManager.finishApplicationMaster(finishReq)
+ }
+ isFinished = true
+ }
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 77b91f8e26..f8fb96b312 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -37,6 +37,8 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
+ var checkerThread: Thread = null
+ var stopping: Boolean = false
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
@@ -86,6 +88,7 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
+ checkerThread = yarnApplicationStateCheckerThread()
}
def waitForApp() {
@@ -116,7 +119,32 @@ private[spark] class YarnClientSchedulerBackend(
}
}
+ private def yarnApplicationStateCheckerThread(): Thread = {
+ val t = new Thread {
+ override def run() {
+ while (!stopping) {
+ val report = client.getApplicationReport(appId)
+ val state = report.getYarnApplicationState()
+ if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
+ || state == YarnApplicationState.FAILED) {
+ logError(s"Yarn application already ended: $state")
+ sc.stop()
+ stopping = true
+ }
+ Thread.sleep(1000L)
+ }
+ checkerThread = null
+ Thread.currentThread().interrupt()
+ }
+ }
+ t.setName("Yarn Application State Checker")
+ t.setDaemon(true)
+ t.start()
+ t
+ }
+
override def stop() {
+ stopping = true
super.stop()
client.stop
logInfo("Stopped")
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 7158d9442a..fc7b8320d7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -19,15 +19,12 @@ package org.apache.spark.deploy.yarn
import java.net.Socket
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
-import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -57,10 +54,16 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = _
- private var driverClosed:Boolean = false
+ private var driverClosed: Boolean = false
+ private var isFinished: Boolean = false
+ private var registered: Boolean = false
private var amClient: AMRMClient[ContainerRequest] = _
+ // Default to numExecutors * 2, with minimum of 3
+ private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
+ sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
+
val securityManager = new SecurityManager(sparkConf)
val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityManager)._1
@@ -101,7 +104,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
amClient.start()
appAttemptId = ApplicationMaster.getApplicationAttemptId()
- registerApplicationMaster()
+ synchronized {
+ if (!isFinished) {
+ registerApplicationMaster()
+ registered = true
+ }
+ }
waitForSparkMaster()
addAmIpFilter()
@@ -210,6 +218,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
+ checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
@@ -228,12 +237,20 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}
+ private def checkNumExecutorsFailed() {
+ if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of executor failures reached")
+ }
+ }
+
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
while (!driverClosed) {
+ checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
yarnAllocator.allocateResources()
@@ -248,10 +265,18 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
t
}
- def finishApplicationMaster(status: FinalApplicationStatus) {
- logInfo("Unregistering ApplicationMaster with " + status)
- val trackingUrl = sparkConf.get("spark.driver.appUIHistoryAddress", "")
- amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
+ def finishApplicationMaster(status: FinalApplicationStatus, appMessage: String = "") {
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ logInfo("Unregistering ApplicationMaster with " + status)
+ if (registered) {
+ val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+ amClient.unregisterApplicationMaster(status, appMessage, trackingUrl)
+ }
+ isFinished = true
+ }
}
}