aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2015-09-10 11:58:54 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 11:58:54 -0700
commitaf3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 (patch)
tree979a7e64505f9ccdabf98148b8f8e9e745448e65 /core
parenta76bde9dae54c4641e21f3c1ceb4870e3dc91881 (diff)
downloadspark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.gz
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.tar.bz2
spark-af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0.zip
[SPARK-8167] Make tasks that fail from YARN preemption not fail job
The architecture is that, in YARN mode, if the driver detects that an executor has disconnected, it asks the ApplicationMaster why the executor died. If the ApplicationMaster is aware that the executor died because of preemption, all tasks associated with that executor are not marked as failed. The executor is still removed from the driver's list of available executors, however. There's a few open questions: 1. Should standalone mode have a similar "get executor loss reason" as well? I localized this change as much as possible to affect only YARN, but there could be a valid case to differentiate executor losses in standalone mode as well. 2. I make a pretty strong assumption in YarnAllocator that getExecutorLossReason(executorId) will only be called once per executor id; I do this so that I can remove the metadata from the in-memory map to avoid object accumulation. It's not clear if I'm being overly zealous to save space, however. cc vanzin specifically for review because it collided with some earlier YARN scheduling work. cc JoshRosen because it's similar to output commit coordination we did in the past cc andrewor14 for our discussion on how to get executor exit codes and loss reasons Author: mcheah <mcheah@palantir.com> Closes #8007 from mccheah/feature/preemption-handling.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala10
15 files changed, 186 insertions, 55 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00dc70..2ae878b3e6 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -48,6 +48,8 @@ case object Success extends TaskEndReason
sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
+
+ def shouldEventuallyFailJob: Boolean = true
}
/**
@@ -194,6 +196,12 @@ case object TaskKilled extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ /**
+ * If a task failed because its attempt to commit was denied, do not count this failure
+ * towards failing the stage. This is intended to prevent spurious stage failures in cases
+ * where many speculative tasks are launched and denied to commit.
+ */
+ override def shouldEventuallyFailJob: Boolean = false
}
/**
@@ -202,8 +210,14 @@ case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extend
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
- override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
+case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val exitBehavior = if (isNormalExit) "normally" else "abnormally"
+ s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+ }
+
+ override def shouldEventuallyFailJob: Boolean = !isNormalExit
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2bc43a9186..0a98c69b89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -23,16 +23,20 @@ import org.apache.spark.executor.ExecutorExitCode
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
-class ExecutorLossReason(val message: String) {
+class ExecutorLossReason(val message: String) extends Serializable {
override def toString: String = message
}
private[spark]
-case class ExecutorExited(val exitCode: Int)
- extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+ extends ExecutorLossReason(reason)
+
+private[spark] object ExecutorExited {
+ def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
+ ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+ }
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
- extends ExecutorLossReason(_message) {
-}
+ extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5821afea98..551e39a81b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -83,8 +83,8 @@ private[spark] class Pool(
null
}
- override def executorLost(executorId: String, host: String) {
- schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
+ override def executorLost(executorId: String, host: String, reason: ExecutorLossReason) {
+ schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
override def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index a87ef030e6..ab00bc8f0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -42,7 +42,7 @@ private[spark] trait Schedulable {
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
- def executorLost(executorId: String, host: String): Unit
+ def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1705e7f962..1c7bfe89c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -332,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {
- removeExecutor(execId)
+ removeExecutor(execId,
+ SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
}
}
@@ -464,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
if (activeExecutorIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
- removeExecutor(executorId)
+ removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
// We may get multiple executorLost() calls with different loss reasons. For example, one
@@ -482,7 +483,7 @@ private[spark] class TaskSchedulerImpl(
}
/** Remove an executor from all our data structures and mark it as lost */
- private def removeExecutor(executorId: String) {
+ private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
@@ -497,7 +498,7 @@ private[spark] class TaskSchedulerImpl(
}
}
executorIdToHost -= executorId
- rootPool.executorLost(executorId, host)
+ rootPool.executorLost(executorId, host, reason)
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 818b95d67f..62af9031b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -709,6 +709,11 @@ private[spark] class TaskSetManager(
}
ef.exception
+ case e: ExecutorLostFailure if e.isNormalExit =>
+ logInfo(s"Task $tid failed because while it was being computed, its executor" +
+ s" exited normally. Not marking the task as failed.")
+ None
+
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None
@@ -722,10 +727,9 @@ private[spark] class TaskSetManager(
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
- if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
- // If a task failed because its attempt to commit was denied, do not count this failure
- // towards failing the stage. This is intended to prevent spurious stage failures in cases
- // where many speculative tasks are launched and denied to commit.
+ if (!isZombie && state != TaskState.KILLED
+ && reason.isInstanceOf[TaskFailedReason]
+ && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
@@ -778,7 +782,7 @@ private[spark] class TaskSetManager(
}
/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
- override def executorLost(execId: String, host: String) {
+ override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
@@ -809,9 +813,12 @@ private[spark] class TaskSetManager(
}
}
}
- // Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
+ val isNormalExit: Boolean = reason match {
+ case exited: ExecutorExited => exited.isNormalExit
+ case _ => false
+ }
+ handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 06f5438433..d947436777 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.{SerializableBuffer, Utils}
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -70,7 +71,8 @@ private[spark] object CoarseGrainedClusterMessages {
case object StopExecutors extends CoarseGrainedClusterMessage
- case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
+ case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
+ extends CoarseGrainedClusterMessage
case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
@@ -92,6 +94,10 @@ private[spark] object CoarseGrainedClusterMessages {
hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage
+ // Check if an executor was force-killed but for a normal reason.
+ // This could be the case if the executor is preempted, for instance.
+ case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
+
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87f96..18771f79b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rpc._
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
/**
@@ -82,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override protected def log = CoarseGrainedSchedulerBackend.this.log
- private val addressToExecutorId = new HashMap[RpcAddress, String]
+ protected val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@@ -128,6 +129,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
@@ -185,8 +187,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
- "remote Rpc client disassociated"))
+ addressToExecutorId
+ .get(remoteAddress)
+ .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
}
// Make fake resource offers on just one executor
@@ -227,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: String): Unit = {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -239,9 +242,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ scheduler.executorLost(executorId, reason)
listenerBus.post(
- SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
+ SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
}
}
@@ -263,8 +266,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// TODO (prashant) send conf instead of properties
- driverEndpoint = rpcEnv.setupEndpoint(
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
+ driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+ }
+
+ protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new DriverEndpoint(rpcEnv, properties)
}
def stopExecutors() {
@@ -304,7 +310,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Called by subclasses when notified of a lost worker
- def removeExecutor(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason) {
try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bbe51b4a09..27491ecf8b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
@@ -135,11 +135,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
- case Some(code) => ExecutorExited(code)
+ case Some(code) => ExecutorExited(code, isNormalExit = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
- removeExecutor(fullId.split("/")(1), reason.toString)
+ removeExecutor(fullId.split("/")(1), reason)
}
override def sufficientResourcesRegistered(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 044f6288fa..6a4b536dee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,12 +17,13 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, ExecutionContext}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{ThreadUtils, RpcUtils}
@@ -43,8 +44,10 @@ private[spark] abstract class YarnSchedulerBackend(
protected var totalExpectedExecutors = 0
- private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
- YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
+ private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+ private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+ YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
@@ -53,7 +56,7 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
@@ -61,7 +64,7 @@ private[spark] abstract class YarnSchedulerBackend(
* Request that the ApplicationMaster kill the specified executors.
*/
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -90,6 +93,41 @@ private[spark] abstract class YarnSchedulerBackend(
}
}
+ override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new YarnDriverEndpoint(rpcEnv, properties)
+ }
+
+ /**
+ * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
+ * This endpoint communicates with the executors and queries the AM for an executor's exit
+ * status when the executor is disconnected.
+ */
+ private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+ /**
+ * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
+ * handles it by assuming the Executor was lost for a bad reason and removes the executor
+ * immediately.
+ *
+ * In YARN's case however it is crucial to talk to the application master and ask why the
+ * executor had exited. In particular, the executor may have exited due to the executor
+ * having been preempted. If the executor "exited normally" according to the application
+ * master then we pass that information down to the TaskSetManager to inform the
+ * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+ *
+ * TODO there's a race condition where while we are querying the ApplicationMaster for
+ * the executor loss reason, there is the potential that tasks will be scheduled on
+ * the executor that failed. We should fix this by having this onDisconnected event
+ * also "blacklist" executors so that tasks are not assigned to them.
+ */
+ override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+ addressToExecutorId.get(rpcAddress).foreach { executorId =>
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
+ }
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the ApplicationMaster.
*/
@@ -101,6 +139,33 @@ private[spark] abstract class YarnSchedulerBackend(
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+ executorId: String,
+ executorRpcAddress: RpcAddress): Unit = {
+ amEndpoint match {
+ case Some(am) =>
+ val lossReasonRequest = GetExecutorLossReason(executorId)
+ val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ future onSuccess {
+ case reason: ExecutorLossReason => {
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
+ }
+ }
+ future onFailure {
+ case NonFatal(e) => {
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
+ }
+ case t => throw t
+ }
+ case None =>
+ logWarning("Attempted to check for an executor loss reason" +
+ " before the AM has registered!")
+ }
+ }
+
override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
@@ -113,6 +178,7 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutor(executorId, reason)
}
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors =>
amEndpoint match {
@@ -143,7 +209,6 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}
-
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 452c32d541..65df887477 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
- removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 2e424054be..18da6d2491 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -390,7 +390,7 @@ private[spark] class MesosSchedulerBackend(
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
+ recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd6d5..24f78744ad 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,8 +362,9 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
- case ExecutorLostFailure(executorId) =>
- ("Executor ID" -> executorId)
+ case ExecutorLostFailure(executorId, isNormalExit) =>
+ ("Executor ID" -> executorId) ~
+ ("Normal Exit" -> isNormalExit)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -794,8 +795,10 @@ private[spark] object JsonProtocol {
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
+ val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
+ map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"))
+ ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index edbdb485c5..f0eadf2409 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -334,7 +334,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Now mark host2 as dead
sched.removeExecutor("exec2")
- manager.executorLost("exec2", "host2")
+ manager.executorLost("exec2", "host2", SlaveLost())
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
@@ -504,13 +504,36 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
- manager.executorLost("execC", "host2")
+ manager.executorLost("execC", "host2", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
sched.removeExecutor("execD")
- manager.executorLost("execD", "host1")
+ manager.executorLost("execD", "host1", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
+ test("Executors are added but exit normally while running tasks") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execB")),
+ Seq(TaskLocation("host2", "execC")),
+ Seq())
+ val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+ sched.addExecutor("execA", "host1")
+ manager.executorAdded()
+ sched.addExecutor("execC", "host2")
+ manager.executorAdded()
+ assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
+ sched.removeExecutor("execA")
+ manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+ assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
+ sched.removeExecutor("execC")
+ manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+
test("test RACK_LOCAL tasks") {
// Assign host1 to rack1
FakeRackUtil.assignHostToRack("host1", "rack1")
@@ -721,8 +744,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
- manager.executorLost("execA", "host1")
- manager.executorLost("execB.2", "host2")
+ manager.executorLost("execA", "host1", SlaveLost())
+ manager.executorLost("execB.2", "host2", SlaveLost())
clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a4139b0..47e548ef0d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,7 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
- testTaskEndReason(ExecutorLostFailure("100"))
+ testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)
// BlockId
@@ -295,10 +295,10 @@ class JsonProtocolSuite extends SparkFunSuite {
test("ExecutorLostFailure backward compatibility") {
// ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
- val executorLostFailure = ExecutorLostFailure("100")
+ val executorLostFailure = ExecutorLostFailure("100", true)
val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
.removeField({ _._1 == "Executor ID" })
- val expectedExecutorLostFailure = ExecutorLostFailure("Unknown")
+ val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
@@ -577,8 +577,10 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
- case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+ case (ExecutorLostFailure(execId1, isNormalExit1),
+ ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)
+ assert(isNormalExit1 === isNormalExit2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}