aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-10-27 16:55:10 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-10-27 16:55:10 -0700
commitb960a890561eaf3795b93c621bd95be81e56f5b7 (patch)
tree349ce1e83ccbf44341caf80f61673f34b1f53d33
parent9fbd75ab5d46612e52116ec5b9ced70715cf26b5 (diff)
downloadspark-b960a890561eaf3795b93c621bd95be81e56f5b7.tar.gz
spark-b960a890561eaf3795b93c621bd95be81e56f5b7.tar.bz2
spark-b960a890561eaf3795b93c621bd95be81e56f5b7.zip
[SPARK-11178] Improving naming around task failures.
Commit af3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 introduced new functionality so that if an executor dies for a reason that's not caused by one of the tasks running on the executor (e.g., due to pre-emption), Spark doesn't count the failure towards the maximum number of failures for the task. That commit introduced some vague naming that this commit attempts to fix; in particular: (1) The variable "isNormalExit", which was used to refer to cases where the executor died for a reason unrelated to the tasks running on the machine, has been renamed (and reversed) to "exitCausedByApp". The problem with the existing name is that it's not clear (at least to me!) what it means for an exit to be "normal"; the new name is intended to make the purpose of this variable more clear. (2) The variable "shouldEventuallyFailJob" has been renamed to "countTowardsTaskFailures". This variable is used to determine whether a task's failure should be counted towards the maximum number of failures allowed for a task before the associated Stage is aborted. The problem with the existing name is that it can be confused with implying that the task's failure should immediately cause the stage to fail because it is somehow fatal (this is the case for a fetch failure, for example: if a task fails because of a fetch failure, there's no point in retrying, and the whole stage should be failed). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #9164 from kayousterhout/SPARK-11178.
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala27
11 files changed, 66 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 9335c5f416..18278b292f 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -53,7 +53,13 @@ sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
- def shouldEventuallyFailJob: Boolean = true
+ /**
+ * Whether this task failure should be counted towards the maximum number of times the task is
+ * allowed to fail before the stage is aborted. Set to false in cases where the task's failure
+ * was unrelated to the task; for example, if the task failed because the executor it was running
+ * on was killed.
+ */
+ def countTowardsTaskFailures: Boolean = true
}
/**
@@ -208,7 +214,7 @@ case class TaskCommitDenied(
* towards failing the stage. This is intended to prevent spurious stage failures in cases
* where many speculative tasks are launched and denied to commit.
*/
- override def shouldEventuallyFailJob: Boolean = false
+ override def countTowardsTaskFailures: Boolean = false
}
/**
@@ -217,14 +223,18 @@ case class TaskCommitDenied(
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
extends TaskFailedReason {
override def toErrorString: String = {
- val exitBehavior = if (isNormalExit) "normally" else "abnormally"
- s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+ val exitBehavior = if (exitCausedByApp) {
+ "caused by one of the running tasks"
+ } else {
+ "unrelated to the running tasks"
+ }
+ s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
}
- override def shouldEventuallyFailJob: Boolean = !isNormalExit
+ override def countTowardsTaskFailures: Boolean = exitCausedByApp
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 0a98c69b89..33edf25043 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -28,12 +28,15 @@ class ExecutorLossReason(val message: String) extends Serializable {
}
private[spark]
-case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+case class ExecutorExited(exitCode: Int, exitCausedByApp: Boolean, reason: String)
extends ExecutorLossReason(reason)
private[spark] object ExecutorExited {
- def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
- ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+ def apply(exitCode: Int, exitCausedByApp: Boolean): ExecutorExited = {
+ ExecutorExited(
+ exitCode,
+ exitCausedByApp,
+ ExecutorExitCode.explainExitCode(exitCode))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 987800d3d1..9b3fad9012 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -704,9 +704,10 @@ private[spark] class TaskSetManager(
}
ef.exception
- case e: ExecutorLostFailure if e.isNormalExit =>
+ case e: ExecutorLostFailure if !e.exitCausedByApp =>
logInfo(s"Task $tid failed because while it was being computed, its executor" +
- s" exited normally. Not marking the task as failed.")
+ "exited for a reason unrelated to the task. Not counting this failure towards the " +
+ "maximum number of failures for the task.")
None
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
@@ -724,7 +725,7 @@ private[spark] class TaskSetManager(
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
- && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
+ && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
@@ -797,11 +798,12 @@ private[spark] class TaskSetManager(
}
}
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- val isNormalExit: Boolean = reason match {
- case exited: ExecutorExited => exited.isNormalExit
- case _ => false
+ val exitCausedByApp: Boolean = reason match {
+ case exited: ExecutorExited => exited.exitCausedByApp
+ case _ => true
}
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
+ handleFailedTask(
+ tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4652df32ef..8103efa730 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -98,7 +98,7 @@ private[spark] object CoarseGrainedClusterMessages {
hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage
- // Check if an executor was force-killed but for a normal reason.
+ // Check if an executor was force-killed but for a reason unrelated to the running tasks.
// This could be the case if the executor is preempted, for instance.
case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a4214c4961..05d9bc92f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -137,7 +137,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
- case Some(code) => ExecutorExited(code, isNormalExit = false, message)
+ case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 38218b9c08..e483688ede 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -111,10 +111,10 @@ private[spark] abstract class YarnSchedulerBackend(
* immediately.
*
* In YARN's case however it is crucial to talk to the application master and ask why the
- * executor had exited. In particular, the executor may have exited due to the executor
- * having been preempted. If the executor "exited normally" according to the application
- * master then we pass that information down to the TaskSetManager to inform the
- * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+ * executor had exited. If the executor exited for some reason unrelated to the running tasks
+ * (e.g., preemption), according to the application master, then we pass that information down
+ * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
+ * not count towards a job failure.
*
* TODO there's a race condition where while we are querying the ApplicationMaster for
* the executor loss reason, there is the potential that tasks will be scheduled on
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 6196176c7c..aaffac604a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -394,7 +394,7 @@ private[spark] class MesosSchedulerBackend(
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
+ recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a06dc6f709..ad6615c112 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -367,9 +367,9 @@ private[spark] object JsonProtocol {
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
- case ExecutorLostFailure(executorId, isNormalExit) =>
+ case ExecutorLostFailure(executorId, exitCausedByApp) =>
("Executor ID" -> executorId) ~
- ("Normal Exit" -> isNormalExit)
+ ("Exit Caused By App" -> exitCausedByApp)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -810,10 +810,9 @@ private[spark] object JsonProtocol {
val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
- val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
- map(_.extract[Boolean])
+ val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
+ ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index cd6bf723e7..ecc18fc6e1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -511,7 +511,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
- test("Executors are added but exit normally while running tasks") {
+ test("Executors exit for reason unrelated to currently running tasks") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
@@ -526,11 +526,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
manager.executorAdded()
assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
sched.removeExecutor("execA")
- manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+ manager.executorLost(
+ "execA",
+ "host1",
+ ExecutorExited(143, false, "Terminated for reason unrelated to running tasks"))
assert(!sched.taskSetsFailed.contains(taskSet.id))
assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
sched.removeExecutor("execC")
- manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+ manager.executorLost(
+ "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks"))
assert(sched.taskSetsFailed.contains(taskSet.id))
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f9572921f4..86137f259c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -603,10 +603,10 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(jobId1 === jobId2)
assert(partitionId1 === partitionId2)
assert(attemptNumber1 === attemptNumber2)
- case (ExecutorLostFailure(execId1, isNormalExit1),
- ExecutorLostFailure(execId2, isNormalExit2)) =>
+ case (ExecutorLostFailure(execId1, exit1CausedByApp),
+ ExecutorLostFailure(execId2, exit2CausedByApp)) =>
assert(execId1 === execId2)
- assert(isNormalExit1 === isNormalExit2)
+ assert(exit1CausedByApp === exit2CausedByApp)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 1deaa3743d..875bbd4e4e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -445,40 +445,41 @@ private[yarn] class YarnAllocator(
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus
- val (isNormalExit, containerExitReason) = exitStatus match {
+ val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
- (true, s"Executor for container $containerId exited normally.")
+ (false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
+ "pre-emption) and not because of an error in the running job.")
case ContainerExitStatus.PREEMPTED =>
- // Preemption should count as a normal exit, since YARN preempts containers merely
- // to do resource sharing, and tasks that fail due to preempted executors could
+ // Preemption is not the fault of the running tasks, since YARN preempts containers
+ // merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
- (true, s"Container ${containerId}${onHostStr} was preempted.")
+ (false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
- (false, memLimitExceededLogMessage(
+ (true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
case PMEM_EXCEEDED_EXIT_CODE =>
- (false, memLimitExceededLogMessage(
+ (true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case unknown =>
numExecutorsFailed += 1
- (false, "Container marked as failed: " + containerId + onHostStr +
+ (true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
}
- if (isNormalExit) {
- logInfo(containerExitReason)
- } else {
+ if (exitCausedByApp) {
logWarning(containerExitReason)
+ } else {
+ logInfo(containerExitReason)
}
- ExecutorExited(0, isNormalExit, containerExitReason)
+ ExecutorExited(0, exitCausedByApp, containerExitReason)
} else {
// If we have already released this container, then it must mean
// that the driver has explicitly requested it to be killed
- ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+ ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
s"Container $containerId exited from explicit termination request.")
}