aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2015-09-10 11:58:54 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 11:58:54 -0700
commitaf3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 (patch)
tree979a7e64505f9ccdabf98148b8f8e9e745448e65
parenta76bde9dae54c4641e21f3c1ceb4870e3dc91881 (diff)
downloadspark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.gz
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.bz2
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.zip
[SPARK-8167] Make tasks that fail from YARN preemption not fail job
The architecture is that, in YARN mode, if the driver detects that an executor has disconnected, it asks the ApplicationMaster why the executor died. If the ApplicationMaster is aware that the executor died because of preemption, all tasks associated with that executor are not marked as failed. The executor is still removed from the driver's list of available executors, however. There's a few open questions: 1. Should standalone mode have a similar "get executor loss reason" as well? I localized this change as much as possible to affect only YARN, but there could be a valid case to differentiate executor losses in standalone mode as well. 2. I make a pretty strong assumption in YarnAllocator that getExecutorLossReason(executorId) will only be called once per executor id; I do this so that I can remove the metadata from the in-memory map to avoid object accumulation. It's not clear if I'm being overly zealous to save space, however. cc vanzin specifically for review because it collided with some earlier YARN scheduling work. cc JoshRosen because it's similar to output commit coordination we did in the past cc andrewor14 for our discussion on how to get executor exit codes and loss reasons Author: mcheah <mcheah@palantir.com> Closes #8007 from mccheah/feature/preemption-handling.
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala92
17 files changed, 261 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00dc70..2ae878b3e6 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -48,6 +48,8 @@ case object Success extends TaskEndReason
sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
+
+ def shouldEventuallyFailJob: Boolean = true
}
/**
@@ -194,6 +196,12 @@ case object TaskKilled extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ /**
+ * If a task failed because its attempt to commit was denied, do not count this failure
+ * towards failing the stage. This is intended to prevent spurious stage failures in cases
+ * where many speculative tasks are launched and denied to commit.
+ */
+ override def shouldEventuallyFailJob: Boolean = false
}
/**
@@ -202,8 +210,14 @@ case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extend
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
- override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
+case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val exitBehavior = if (isNormalExit) "normally" else "abnormally"
+ s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+ }
+
+ override def shouldEventuallyFailJob: Boolean = !isNormalExit
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2bc43a9186..0a98c69b89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -23,16 +23,20 @@ import org.apache.spark.executor.ExecutorExitCode
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
-class ExecutorLossReason(val message: String) {
+class ExecutorLossReason(val message: String) extends Serializable {
override def toString: String = message
}
private[spark]
-case class ExecutorExited(val exitCode: Int)
- extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+ extends ExecutorLossReason(reason)
+
+private[spark] object ExecutorExited {
+ def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
+ ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+ }
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
- extends ExecutorLossReason(_message) {
-}
+ extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5821afea98..551e39a81b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -83,8 +83,8 @@ private[spark] class Pool(
null
}
- override def executorLost(executorId: String, host: String) {
- schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
+ override def executorLost(executorId: String, host: String, reason: ExecutorLossReason) {
+ schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
override def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index a87ef030e6..ab00bc8f0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -42,7 +42,7 @@ private[spark] trait Schedulable {
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
- def executorLost(executorId: String, host: String): Unit
+ def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1705e7f962..1c7bfe89c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -332,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {
- removeExecutor(execId)
+ removeExecutor(execId,
+ SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
}
}
@@ -464,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
if (activeExecutorIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
- removeExecutor(executorId)
+ removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
// We may get multiple executorLost() calls with different loss reasons. For example, one
@@ -482,7 +483,7 @@ private[spark] class TaskSchedulerImpl(
}
/** Remove an executor from all our data structures and mark it as lost */
- private def removeExecutor(executorId: String) {
+ private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
@@ -497,7 +498,7 @@ private[spark] class TaskSchedulerImpl(
}
}
executorIdToHost -= executorId
- rootPool.executorLost(executorId, host)
+ rootPool.executorLost(executorId, host, reason)
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 818b95d67f..62af9031b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -709,6 +709,11 @@ private[spark] class TaskSetManager(
}
ef.exception
+ case e: ExecutorLostFailure if e.isNormalExit =>
+ logInfo(s"Task $tid failed because while it was being computed, its executor" +
+ s" exited normally. Not marking the task as failed.")
+ None
+
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None
@@ -722,10 +727,9 @@ private[spark] class TaskSetManager(
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
- if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
- // If a task failed because its attempt to commit was denied, do not count this failure
- // towards failing the stage. This is intended to prevent spurious stage failures in cases
- // where many speculative tasks are launched and denied to commit.
+ if (!isZombie && state != TaskState.KILLED
+ && reason.isInstanceOf[TaskFailedReason]
+ && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
@@ -778,7 +782,7 @@ private[spark] class TaskSetManager(
}
/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
- override def executorLost(execId: String, host: String) {
+ override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
@@ -809,9 +813,12 @@ private[spark] class TaskSetManager(
}
}
}
- // Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
+ val isNormalExit: Boolean = reason match {
+ case exited: ExecutorExited => exited.isNormalExit
+ case _ => false
+ }
+ handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 06f5438433..d947436777 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.{SerializableBuffer, Utils}
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -70,7 +71,8 @@ private[spark] object CoarseGrainedClusterMessages {
case object StopExecutors extends CoarseGrainedClusterMessage
- case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
+ case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
+ extends CoarseGrainedClusterMessage
case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
@@ -92,6 +94,10 @@ private[spark] object CoarseGrainedClusterMessages {
hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage
+ // Check if an executor was force-killed but for a normal reason.
+ // This could be the case if the executor is preempted, for instance.
+ case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
+
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87f96..18771f79b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rpc._
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
/**
@@ -82,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override protected def log = CoarseGrainedSchedulerBackend.this.log
- private val addressToExecutorId = new HashMap[RpcAddress, String]
+ protected val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@@ -128,6 +129,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
@@ -185,8 +187,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
- "remote Rpc client disassociated"))
+ addressToExecutorId
+ .get(remoteAddress)
+ .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
}
// Make fake resource offers on just one executor
@@ -227,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: String): Unit = {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -239,9 +242,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ scheduler.executorLost(executorId, reason)
listenerBus.post(
- SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
+ SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
}
}
@@ -263,8 +266,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// TODO (prashant) send conf instead of properties
- driverEndpoint = rpcEnv.setupEndpoint(
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
+ driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+ }
+
+ protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new DriverEndpoint(rpcEnv, properties)
}
def stopExecutors() {
@@ -304,7 +310,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Called by subclasses when notified of a lost worker
- def removeExecutor(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason) {
try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bbe51b4a09..27491ecf8b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
@@ -135,11 +135,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
- case Some(code) => ExecutorExited(code)
+ case Some(code) => ExecutorExited(code, isNormalExit = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
- removeExecutor(fullId.split("/")(1), reason.toString)
+ removeExecutor(fullId.split("/")(1), reason)
}
override def sufficientResourcesRegistered(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 044f6288fa..6a4b536dee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,12 +17,13 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, ExecutionContext}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{ThreadUtils, RpcUtils}
@@ -43,8 +44,10 @@ private[spark] abstract class YarnSchedulerBackend(
protected var totalExpectedExecutors = 0
- private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
- YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
+ private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+ private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+ YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
@@ -53,7 +56,7 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
@@ -61,7 +64,7 @@ private[spark] abstract class YarnSchedulerBackend(
* Request that the ApplicationMaster kill the specified executors.
*/
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -90,6 +93,41 @@ private[spark] abstract class YarnSchedulerBackend(
}
}
+ override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new YarnDriverEndpoint(rpcEnv, properties)
+ }
+
+ /**
+ * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
+ * This endpoint communicates with the executors and queries the AM for an executor's exit
+ * status when the executor is disconnected.
+ */
+ private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+ /**
+ * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
+ * handles it by assuming the Executor was lost for a bad reason and removes the executor
+ * immediately.
+ *
+ * In YARN's case however it is crucial to talk to the application master and ask why the
+ * executor had exited. In particular, the executor may have exited due to the executor
+ * having been preempted. If the executor "exited normally" according to the application
+ * master then we pass that information down to the TaskSetManager to inform the
+ * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+ *
+ * TODO there's a race condition where while we are querying the ApplicationMaster for
+ * the executor loss reason, there is the potential that tasks will be scheduled on
+ * the executor that failed. We should fix this by having this onDisconnected event
+ * also "blacklist" executors so that tasks are not assigned to them.
+ */
+ override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+ addressToExecutorId.get(rpcAddress).foreach { executorId =>
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
+ }
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the ApplicationMaster.
*/
@@ -101,6 +139,33 @@ private[spark] abstract class YarnSchedulerBackend(
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+ executorId: String,
+ executorRpcAddress: RpcAddress): Unit = {
+ amEndpoint match {
+ case Some(am) =>
+ val lossReasonRequest = GetExecutorLossReason(executorId)
+ val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ future onSuccess {
+ case reason: ExecutorLossReason => {
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
+ }
+ }
+ future onFailure {
+ case NonFatal(e) => {
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
+ }
+ case t => throw t
+ }
+ case None =>
+ logWarning("Attempted to check for an executor loss reason" +
+ " before the AM has registered!")
+ }
+ }
+
override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
@@ -113,6 +178,7 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutor(executorId, reason)
}
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors =>
amEndpoint match {
@@ -143,7 +209,6 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}
-
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 452c32d541..65df887477 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
- removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 2e424054be..18da6d2491 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -390,7 +390,7 @@ private[spark] class MesosSchedulerBackend(
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
+ recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd6d5..24f78744ad 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,8 +362,9 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
- case ExecutorLostFailure(executorId) =>
- ("Executor ID" -> executorId)
+ case ExecutorLostFailure(executorId, isNormalExit) =>
+ ("Executor ID" -> executorId) ~
+ ("Normal Exit" -> isNormalExit)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -794,8 +795,10 @@ private[spark] object JsonProtocol {
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
+ val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
+ map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"))
+ ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index edbdb485c5..f0eadf2409 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -334,7 +334,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Now mark host2 as dead
sched.removeExecutor("exec2")
- manager.executorLost("exec2", "host2")
+ manager.executorLost("exec2", "host2", SlaveLost())
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
@@ -504,13 +504,36 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
- manager.executorLost("execC", "host2")
+ manager.executorLost("execC", "host2", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
sched.removeExecutor("execD")
- manager.executorLost("execD", "host1")
+ manager.executorLost("execD", "host1", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
+ test("Executors are added but exit normally while running tasks") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execB")),
+ Seq(TaskLocation("host2", "execC")),
+ Seq())
+ val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+ sched.addExecutor("execA", "host1")
+ manager.executorAdded()
+ sched.addExecutor("execC", "host2")
+ manager.executorAdded()
+ assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
+ sched.removeExecutor("execA")
+ manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+ assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
+ sched.removeExecutor("execC")
+ manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+
test("test RACK_LOCAL tasks") {
// Assign host1 to rack1
FakeRackUtil.assignHostToRack("host1", "rack1")
@@ -721,8 +744,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
- manager.executorLost("execA", "host1")
- manager.executorLost("execB.2", "host2")
+ manager.executorLost("execA", "host1", SlaveLost())
+ manager.executorLost("execB.2", "host2", SlaveLost())
clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a4139b0..47e548ef0d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,7 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
- testTaskEndReason(ExecutorLostFailure("100"))
+ testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)
// BlockId
@@ -295,10 +295,10 @@ class JsonProtocolSuite extends SparkFunSuite {
test("ExecutorLostFailure backward compatibility") {
// ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
- val executorLostFailure = ExecutorLostFailure("100")
+ val executorLostFailure = ExecutorLostFailure("100", true)
val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
.removeField({ _._1 == "Executor ID" })
- val expectedExecutorLostFailure = ExecutorLostFailure("Unknown")
+ val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
@@ -577,8 +577,10 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
- case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+ case (ExecutorLostFailure(execId1, isNormalExit1),
+ ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)
+ assert(isNormalExit1 === isNormalExit2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 991b5cec00..93621b44c9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -590,6 +590,13 @@ private[spark] class ApplicationMaster(
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
context.reply(true)
+
+ case GetExecutorLossReason(eid) =>
+ Option(allocator) match {
+ case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
+ case None => logWarning(s"Container allocator is not ready to find" +
+ s" executor loss reasons yet.")
+ }
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5f897cbcb4..fd88b8b7fe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,8 +21,9 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
-import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConverters._
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -36,8 +37,9 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.util.Utils
/**
@@ -93,6 +95,11 @@ private[yarn] class YarnAllocator(
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
+ // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+ // list of requesters that should be responded to once we find out why the given executor
+ // was lost.
+ private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -235,9 +242,7 @@ private[yarn] class YarnAllocator(
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
-
processCompletedContainers(completedContainers.asScala)
-
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
}
@@ -429,7 +434,7 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
- if (!alreadyReleased) {
+ val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
@@ -440,22 +445,42 @@ private[yarn] class YarnAllocator(
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) {
- logInfo("Container preempted: " + containerId)
- } else if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- VMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- PMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus != 0) {
- logInfo("Container marked as failed: " + containerId +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
- numExecutorsFailed += 1
+ val exitStatus = completedContainer.getExitStatus
+ val (isNormalExit, containerExitReason) = exitStatus match {
+ case ContainerExitStatus.SUCCESS =>
+ (true, s"Executor for container $containerId exited normally.")
+ case ContainerExitStatus.PREEMPTED =>
+ // Preemption should count as a normal exit, since YARN preempts containers merely
+ // to do resource sharing, and tasks that fail due to preempted executors could
+ // just as easily finish on any other executor. See SPARK-8167.
+ (true, s"Container $containerId was preempted.")
+ // Should probably still count memory exceeded exit codes towards task failures
+ case VMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ case PMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ case unknown =>
+ numExecutorsFailed += 1
+ (false, "Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+
+ }
+ if (isNormalExit) {
+ logInfo(containerExitReason)
+ } else {
+ logWarning(containerExitReason)
}
+ ExecutorExited(0, isNormalExit, containerExitReason)
+ } else {
+ // If we have already released this container, then it must mean
+ // that the driver has explicitly requested it to be killed
+ ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+ s"Container $containerId exited from explicit termination request.")
}
if (allocatedContainerToHostMap.contains(containerId)) {
@@ -474,18 +499,35 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
-
+ pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+ }
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
- driverRef.send(RemoveExecutor(eid,
- s"Yarn deallocated the executor $eid (container $containerId)"))
+ driverRef.send(RemoveExecutor(eid, exitReason))
}
}
}
}
+ /**
+ * Register that some RpcCallContext has asked the AM why the executor was lost. Note that
+ * we can only find the loss reason to send back in the next call to allocateResources().
+ */
+ private[yarn] def enqueueGetLossReasonRequest(
+ eid: String,
+ context: RpcCallContext): Unit = synchronized {
+ if (executorIdToContainer.contains(eid)) {
+ pendingLossReasonRequests
+ .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else {
+ logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ }
+ }
+
private def internalReleaseContainer(container: Container): Unit = {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
@@ -501,6 +543,8 @@ private object YarnAllocator {
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+ val VMEM_EXCEEDED_EXIT_CODE = -103
+ val PMEM_EXCEEDED_EXIT_CODE = -104
def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)