aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala92
17 files changed, 261 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00dc70..2ae878b3e6 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -48,6 +48,8 @@ case object Success extends TaskEndReason
sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
+
+ def shouldEventuallyFailJob: Boolean = true
}
/**
@@ -194,6 +196,12 @@ case object TaskKilled extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+ /**
+ * If a task failed because its attempt to commit was denied, do not count this failure
+ * towards failing the stage. This is intended to prevent spurious stage failures in cases
+ * where many speculative tasks are launched and denied to commit.
+ */
+ override def shouldEventuallyFailJob: Boolean = false
}
/**
@@ -202,8 +210,14 @@ case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extend
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
- override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
+case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val exitBehavior = if (isNormalExit) "normally" else "abnormally"
+ s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+ }
+
+ override def shouldEventuallyFailJob: Boolean = !isNormalExit
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2bc43a9186..0a98c69b89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -23,16 +23,20 @@ import org.apache.spark.executor.ExecutorExitCode
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
-class ExecutorLossReason(val message: String) {
+class ExecutorLossReason(val message: String) extends Serializable {
override def toString: String = message
}
private[spark]
-case class ExecutorExited(val exitCode: Int)
- extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+ extends ExecutorLossReason(reason)
+
+private[spark] object ExecutorExited {
+ def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
+ ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+ }
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
- extends ExecutorLossReason(_message) {
-}
+ extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5821afea98..551e39a81b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -83,8 +83,8 @@ private[spark] class Pool(
null
}
- override def executorLost(executorId: String, host: String) {
- schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
+ override def executorLost(executorId: String, host: String, reason: ExecutorLossReason) {
+ schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
override def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index a87ef030e6..ab00bc8f0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -42,7 +42,7 @@ private[spark] trait Schedulable {
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
- def executorLost(executorId: String, host: String): Unit
+ def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1705e7f962..1c7bfe89c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -332,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {
- removeExecutor(execId)
+ removeExecutor(execId,
+ SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
}
}
@@ -464,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
if (activeExecutorIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
- removeExecutor(executorId)
+ removeExecutor(executorId, reason)
failedExecutor = Some(executorId)
} else {
// We may get multiple executorLost() calls with different loss reasons. For example, one
@@ -482,7 +483,7 @@ private[spark] class TaskSchedulerImpl(
}
/** Remove an executor from all our data structures and mark it as lost */
- private def removeExecutor(executorId: String) {
+ private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
@@ -497,7 +498,7 @@ private[spark] class TaskSchedulerImpl(
}
}
executorIdToHost -= executorId
- rootPool.executorLost(executorId, host)
+ rootPool.executorLost(executorId, host, reason)
}
def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 818b95d67f..62af9031b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -709,6 +709,11 @@ private[spark] class TaskSetManager(
}
ef.exception
+ case e: ExecutorLostFailure if e.isNormalExit =>
+ logInfo(s"Task $tid failed because while it was being computed, its executor" +
+ s" exited normally. Not marking the task as failed.")
+ None
+
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None
@@ -722,10 +727,9 @@ private[spark] class TaskSetManager(
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
- if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
- // If a task failed because its attempt to commit was denied, do not count this failure
- // towards failing the stage. This is intended to prevent spurious stage failures in cases
- // where many speculative tasks are launched and denied to commit.
+ if (!isZombie && state != TaskState.KILLED
+ && reason.isInstanceOf[TaskFailedReason]
+ && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
@@ -778,7 +782,7 @@ private[spark] class TaskSetManager(
}
/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
- override def executorLost(execId: String, host: String) {
+ override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
@@ -809,9 +813,12 @@ private[spark] class TaskSetManager(
}
}
}
- // Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
+ val isNormalExit: Boolean = reason match {
+ case exited: ExecutorExited => exited.isNormalExit
+ case _ => false
+ }
+ handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 06f5438433..d947436777 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.{SerializableBuffer, Utils}
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -70,7 +71,8 @@ private[spark] object CoarseGrainedClusterMessages {
case object StopExecutors extends CoarseGrainedClusterMessage
- case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
+ case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
+ extends CoarseGrainedClusterMessage
case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
@@ -92,6 +94,10 @@ private[spark] object CoarseGrainedClusterMessages {
hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage
+ // Check if an executor was force-killed but for a normal reason.
+ // This could be the case if the executor is preempted, for instance.
+ case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
+
case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87f96..18771f79b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rpc._
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
/**
@@ -82,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override protected def log = CoarseGrainedSchedulerBackend.this.log
- private val addressToExecutorId = new HashMap[RpcAddress, String]
+ protected val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@@ -128,6 +129,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
@@ -185,8 +187,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
- "remote Rpc client disassociated"))
+ addressToExecutorId
+ .get(remoteAddress)
+ .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
}
// Make fake resource offers on just one executor
@@ -227,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
- def removeExecutor(executorId: String, reason: String): Unit = {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
@@ -239,9 +242,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ scheduler.executorLost(executorId, reason)
listenerBus.post(
- SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
+ SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
case None => logInfo(s"Asked to remove non-existent executor $executorId")
}
}
@@ -263,8 +266,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// TODO (prashant) send conf instead of properties
- driverEndpoint = rpcEnv.setupEndpoint(
- CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
+ driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+ }
+
+ protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new DriverEndpoint(rpcEnv, properties)
}
def stopExecutors() {
@@ -304,7 +310,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Called by subclasses when notified of a lost worker
- def removeExecutor(executorId: String, reason: String) {
+ def removeExecutor(executorId: String, reason: ExecutorLossReason) {
try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bbe51b4a09..27491ecf8b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rpc.RpcAddress
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
@@ -135,11 +135,11 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
- case Some(code) => ExecutorExited(code)
+ case Some(code) => ExecutorExited(code, isNormalExit = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
- removeExecutor(fullId.split("/")(1), reason.toString)
+ removeExecutor(fullId.split("/")(1), reason)
}
override def sufficientResourcesRegistered(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 044f6288fa..6a4b536dee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,12 +17,13 @@
package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Future, ExecutionContext}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler._
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{ThreadUtils, RpcUtils}
@@ -43,8 +44,10 @@ private[spark] abstract class YarnSchedulerBackend(
protected var totalExpectedExecutors = 0
- private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
- YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
+ private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+ private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+ YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
@@ -53,7 +56,7 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
}
@@ -61,7 +64,7 @@ private[spark] abstract class YarnSchedulerBackend(
* Request that the ApplicationMaster kill the specified executors.
*/
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
- yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+ yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
}
override def sufficientResourcesRegistered(): Boolean = {
@@ -90,6 +93,41 @@ private[spark] abstract class YarnSchedulerBackend(
}
}
+ override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new YarnDriverEndpoint(rpcEnv, properties)
+ }
+
+ /**
+ * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
+ * This endpoint communicates with the executors and queries the AM for an executor's exit
+ * status when the executor is disconnected.
+ */
+ private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+ /**
+ * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
+ * handles it by assuming the Executor was lost for a bad reason and removes the executor
+ * immediately.
+ *
+ * In YARN's case however it is crucial to talk to the application master and ask why the
+ * executor had exited. In particular, the executor may have exited due to the executor
+ * having been preempted. If the executor "exited normally" according to the application
+ * master then we pass that information down to the TaskSetManager to inform the
+ * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+ *
+ * TODO there's a race condition where while we are querying the ApplicationMaster for
+ * the executor loss reason, there is the potential that tasks will be scheduled on
+ * the executor that failed. We should fix this by having this onDisconnected event
+ * also "blacklist" executors so that tasks are not assigned to them.
+ */
+ override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+ addressToExecutorId.get(rpcAddress).foreach { executorId =>
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
+ }
+ }
+
/**
* An [[RpcEndpoint]] that communicates with the ApplicationMaster.
*/
@@ -101,6 +139,33 @@ private[spark] abstract class YarnSchedulerBackend(
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
+ private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+ executorId: String,
+ executorRpcAddress: RpcAddress): Unit = {
+ amEndpoint match {
+ case Some(am) =>
+ val lossReasonRequest = GetExecutorLossReason(executorId)
+ val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+ future onSuccess {
+ case reason: ExecutorLossReason => {
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
+ }
+ }
+ future onFailure {
+ case NonFatal(e) => {
+ logWarning(s"Attempted to get executor loss reason" +
+ s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+ s" but got no response. Marking as slave lost.", e)
+ driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
+ }
+ case t => throw t
+ }
+ case None =>
+ logWarning("Attempted to check for an executor loss reason" +
+ " before the AM has registered!")
+ }
+ }
+
override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
@@ -113,6 +178,7 @@ private[spark] abstract class YarnSchedulerBackend(
removeExecutor(executorId, reason)
}
+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors =>
amEndpoint match {
@@ -143,7 +209,6 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}
-
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 452c32d541..65df887477 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
- removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+ removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 2e424054be..18da6d2491 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -390,7 +390,7 @@ private[spark] class MesosSchedulerBackend(
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
+ recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd6d5..24f78744ad 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,8 +362,9 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
- case ExecutorLostFailure(executorId) =>
- ("Executor ID" -> executorId)
+ case ExecutorLostFailure(executorId, isNormalExit) =>
+ ("Executor ID" -> executorId) ~
+ ("Normal Exit" -> isNormalExit)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -794,8 +795,10 @@ private[spark] object JsonProtocol {
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `executorLostFailure` =>
+ val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
+ map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"))
+ ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index edbdb485c5..f0eadf2409 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -334,7 +334,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Now mark host2 as dead
sched.removeExecutor("exec2")
- manager.executorLost("exec2", "host2")
+ manager.executorLost("exec2", "host2", SlaveLost())
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
@@ -504,13 +504,36 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
- manager.executorLost("execC", "host2")
+ manager.executorLost("execC", "host2", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
sched.removeExecutor("execD")
- manager.executorLost("execD", "host1")
+ manager.executorLost("execD", "host1", SlaveLost())
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
+ test("Executors are added but exit normally while running tasks") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc)
+ val taskSet = FakeTask.createTaskSet(4,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execB")),
+ Seq(TaskLocation("host2", "execC")),
+ Seq())
+ val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+ sched.addExecutor("execA", "host1")
+ manager.executorAdded()
+ sched.addExecutor("execC", "host2")
+ manager.executorAdded()
+ assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
+ sched.removeExecutor("execA")
+ manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+ assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
+ sched.removeExecutor("execC")
+ manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+
test("test RACK_LOCAL tasks") {
// Assign host1 to rack1
FakeRackUtil.assignHostToRack("host1", "rack1")
@@ -721,8 +744,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
sched.removeExecutor("execA")
sched.removeExecutor("execB.2")
- manager.executorLost("execA", "host1")
- manager.executorLost("execB.2", "host2")
+ manager.executorLost("execA", "host1", SlaveLost())
+ manager.executorLost("execB.2", "host2", SlaveLost())
clock.advance(LOCALITY_WAIT_MS * 4)
sched.addExecutor("execC", "host3")
manager.executorAdded()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a4139b0..47e548ef0d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,7 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
- testTaskEndReason(ExecutorLostFailure("100"))
+ testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)
// BlockId
@@ -295,10 +295,10 @@ class JsonProtocolSuite extends SparkFunSuite {
test("ExecutorLostFailure backward compatibility") {
// ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
- val executorLostFailure = ExecutorLostFailure("100")
+ val executorLostFailure = ExecutorLostFailure("100", true)
val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
.removeField({ _._1 == "Executor ID" })
- val expectedExecutorLostFailure = ExecutorLostFailure("Unknown")
+ val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
}
@@ -577,8 +577,10 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
- case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+ case (ExecutorLostFailure(execId1, isNormalExit1),
+ ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)
+ assert(isNormalExit1 === isNormalExit2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 991b5cec00..93621b44c9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -590,6 +590,13 @@ private[spark] class ApplicationMaster(
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
context.reply(true)
+
+ case GetExecutorLossReason(eid) =>
+ Option(allocator) match {
+ case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
+ case None => logWarning(s"Container allocator is not ready to find" +
+ s" executor loss reasons yet.")
+ }
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5f897cbcb4..fd88b8b7fe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,8 +21,9 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
-import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConverters._
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -36,8 +37,9 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.util.Utils
/**
@@ -93,6 +95,11 @@ private[yarn] class YarnAllocator(
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
+ // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+ // list of requesters that should be responded to once we find out why the given executor
+ // was lost.
+ private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -235,9 +242,7 @@ private[yarn] class YarnAllocator(
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
-
processCompletedContainers(completedContainers.asScala)
-
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
}
@@ -429,7 +434,7 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId)
- if (!alreadyReleased) {
+ val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
@@ -440,22 +445,42 @@ private[yarn] class YarnAllocator(
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) {
- logInfo("Container preempted: " + containerId)
- } else if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- VMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- PMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus != 0) {
- logInfo("Container marked as failed: " + containerId +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
- numExecutorsFailed += 1
+ val exitStatus = completedContainer.getExitStatus
+ val (isNormalExit, containerExitReason) = exitStatus match {
+ case ContainerExitStatus.SUCCESS =>
+ (true, s"Executor for container $containerId exited normally.")
+ case ContainerExitStatus.PREEMPTED =>
+ // Preemption should count as a normal exit, since YARN preempts containers merely
+ // to do resource sharing, and tasks that fail due to preempted executors could
+ // just as easily finish on any other executor. See SPARK-8167.
+ (true, s"Container $containerId was preempted.")
+ // Should probably still count memory exceeded exit codes towards task failures
+ case VMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ case PMEM_EXCEEDED_EXIT_CODE =>
+ (false, memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ case unknown =>
+ numExecutorsFailed += 1
+ (false, "Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+
+ }
+ if (isNormalExit) {
+ logInfo(containerExitReason)
+ } else {
+ logWarning(containerExitReason)
}
+ ExecutorExited(0, isNormalExit, containerExitReason)
+ } else {
+ // If we have already released this container, then it must mean
+ // that the driver has explicitly requested it to be killed
+ ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+ s"Container $containerId exited from explicit termination request.")
}
if (allocatedContainerToHostMap.contains(containerId)) {
@@ -474,18 +499,35 @@ private[yarn] class YarnAllocator(
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
-
+ pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
+ // Notify application of executor loss reasons so it can decide whether it should abort
+ pendingRequests.foreach(_.reply(exitReason))
+ }
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
- driverRef.send(RemoveExecutor(eid,
- s"Yarn deallocated the executor $eid (container $containerId)"))
+ driverRef.send(RemoveExecutor(eid, exitReason))
}
}
}
}
+ /**
+ * Register that some RpcCallContext has asked the AM why the executor was lost. Note that
+ * we can only find the loss reason to send back in the next call to allocateResources().
+ */
+ private[yarn] def enqueueGetLossReasonRequest(
+ eid: String,
+ context: RpcCallContext): Unit = synchronized {
+ if (executorIdToContainer.contains(eid)) {
+ pendingLossReasonRequests
+ .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+ } else {
+ logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+ }
+ }
+
private def internalReleaseContainer(container: Container): Unit = {
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
@@ -501,6 +543,8 @@ private object YarnAllocator {
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
val VMEM_EXCEEDED_PATTERN =
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+ val VMEM_EXCEEDED_EXIT_CODE = -103
+ val PMEM_EXCEEDED_EXIT_CODE = -104
def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
val matcher = pattern.matcher(diagnostics)