aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala27
11 files changed, 66 insertions, 47 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 9335c5f416..18278b292f 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -53,7 +53,13 @@ sealed trait TaskFailedReason extends TaskEndReason {
/** Error message displayed in the web UI. */
def toErrorString: String
- def shouldEventuallyFailJob: Boolean = true
+ /**
+ * Whether this task failure should be counted towards the maximum number of times the task is
+ * allowed to fail before the stage is aborted. Set to false in cases where the task's failure
+ * was unrelated to the task; for example, if the task failed because the executor it was running
+ * on was killed.
+ */
+ def countTowardsTaskFailures: Boolean = true
}
/**
@@ -208,7 +214,7 @@ case class TaskCommitDenied(
* towards failing the stage. This is intended to prevent spurious stage failures in cases
* where many speculative tasks are launched and denied to commit.
*/
- override def shouldEventuallyFailJob: Boolean = false
+ override def countTowardsTaskFailures: Boolean = false
}
/**
@@ -217,14 +223,18 @@ case class TaskCommitDenied(
* the task crashed the JVM.
*/
@DeveloperApi
-case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
extends TaskFailedReason {
override def toErrorString: String = {
- val exitBehavior = if (isNormalExit) "normally" else "abnormally"
- s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+ val exitBehavior = if (exitCausedByApp) {
+ "caused by one of the running tasks"
+ } else {
+ "unrelated to the running tasks"
+ }
+ s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
}
- override def shouldEventuallyFailJob: Boolean = !isNormalExit
+ override def countTowardsTaskFailures: Boolean = exitCausedByApp
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 0a98c69b89..33edf25043 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -28,12 +28,15 @@ class ExecutorLossReason(val message: String) extends Serializable {
}
private[spark]
-case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+case class ExecutorExited(exitCode: Int, exitCausedByApp: Boolean, reason: String)
extends ExecutorLossReason(reason)
private[spark] object ExecutorExited {
- def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
- ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+ def apply(exitCode: Int, exitCausedByApp: Boolean): ExecutorExited = {
+ ExecutorExited(
+ exitCode,
+ exitCausedByApp,
+ ExecutorExitCode.explainExitCode(exitCode))
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 987800d3d1..9b3fad9012 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -704,9 +704,10 @@ private[spark] class TaskSetManager(
}
ef.exception
- case e: ExecutorLostFailure if e.isNormalExit =>
+ case e: ExecutorLostFailure if !e.exitCausedByApp =>
logInfo(s"Task $tid failed because while it was being computed, its executor" +
- s" exited normally. Not marking the task as failed.")
+ "exited for a reason unrelated to the task. Not counting this failure towards the " +
+ "maximum number of failures for the task.")
None
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
@@ -724,7 +725,7 @@ private[spark] class TaskSetManager(
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
- && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
+ && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
@@ -797,11 +798,12 @@ private[spark] class TaskSetManager(
}
}
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- val isNormalExit: Boolean = reason match {
- case exited: ExecutorExited => exited.isNormalExit
- case _ => false
+ val exitCausedByApp: Boolean = reason match {
+ case exited: ExecutorExited => exited.exitCausedByApp
+ case _ => true
}
- handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
+ handleFailedTask(
+ tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4652df32ef..8103efa730 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -98,7 +98,7 @@ private[spark] object CoarseGrainedClusterMessages {
hostToLocalTaskCount: Map[String, Int])
extends CoarseGrainedClusterMessage
- // Check if an executor was force-killed but for a normal reason.
+ // Check if an executor was force-killed but for a reason unrelated to the running tasks.
// This could be the case if the executor is preempted, for instance.
case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a4214c4961..05d9bc92f2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -137,7 +137,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
- case Some(code) => ExecutorExited(code, isNormalExit = false, message)
+ case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 38218b9c08..e483688ede 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -111,10 +111,10 @@ private[spark] abstract class YarnSchedulerBackend(
* immediately.
*
* In YARN's case however it is crucial to talk to the application master and ask why the
- * executor had exited. In particular, the executor may have exited due to the executor
- * having been preempted. If the executor "exited normally" according to the application
- * master then we pass that information down to the TaskSetManager to inform the
- * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+ * executor had exited. If the executor exited for some reason unrelated to the running tasks
+ * (e.g., preemption), according to the application master, then we pass that information down
+ * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
+ * not count towards a job failure.
*
* TODO there's a race condition where while we are querying the ApplicationMaster for
* the executor loss reason, there is the potential that tasks will be scheduled on
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 6196176c7c..aaffac604a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -394,7 +394,7 @@ private[spark] class MesosSchedulerBackend(
slaveId: SlaveID, status: Int) {
logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
+ recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a06dc6f709..ad6615c112 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -367,9 +367,9 @@ private[spark] object JsonProtocol {
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
- case ExecutorLostFailure(executorId, isNormalExit) =>
+ case ExecutorLostFailure(executorId, exitCausedByApp) =>
("Executor ID" -> executorId) ~
- ("Normal Exit" -> isNormalExit)
+ ("Exit Caused By App" -> exitCausedByApp)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -810,10 +810,9 @@ private[spark] object JsonProtocol {
val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
- val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
- map(_.extract[Boolean])
+ val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
- ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
+ ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true))
case `unknownReason` => UnknownReason
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index cd6bf723e7..ecc18fc6e1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -511,7 +511,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
- test("Executors are added but exit normally while running tasks") {
+ test("Executors exit for reason unrelated to currently running tasks") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
@@ -526,11 +526,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
manager.executorAdded()
assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
sched.removeExecutor("execA")
- manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+ manager.executorLost(
+ "execA",
+ "host1",
+ ExecutorExited(143, false, "Terminated for reason unrelated to running tasks"))
assert(!sched.taskSetsFailed.contains(taskSet.id))
assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
sched.removeExecutor("execC")
- manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+ manager.executorLost(
+ "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks"))
assert(sched.taskSetsFailed.contains(taskSet.id))
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f9572921f4..86137f259c 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -603,10 +603,10 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(jobId1 === jobId2)
assert(partitionId1 === partitionId2)
assert(attemptNumber1 === attemptNumber2)
- case (ExecutorLostFailure(execId1, isNormalExit1),
- ExecutorLostFailure(execId2, isNormalExit2)) =>
+ case (ExecutorLostFailure(execId1, exit1CausedByApp),
+ ExecutorLostFailure(execId2, exit2CausedByApp)) =>
assert(execId1 === execId2)
- assert(isNormalExit1 === isNormalExit2)
+ assert(exit1CausedByApp === exit2CausedByApp)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 1deaa3743d..875bbd4e4e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -445,40 +445,41 @@ private[yarn] class YarnAllocator(
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus
- val (isNormalExit, containerExitReason) = exitStatus match {
+ val (exitCausedByApp, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS =>
- (true, s"Executor for container $containerId exited normally.")
+ (false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
+ "pre-emption) and not because of an error in the running job.")
case ContainerExitStatus.PREEMPTED =>
- // Preemption should count as a normal exit, since YARN preempts containers merely
- // to do resource sharing, and tasks that fail due to preempted executors could
+ // Preemption is not the fault of the running tasks, since YARN preempts containers
+ // merely to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167.
- (true, s"Container ${containerId}${onHostStr} was preempted.")
+ (false, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE =>
- (false, memLimitExceededLogMessage(
+ (true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))
case PMEM_EXCEEDED_EXIT_CODE =>
- (false, memLimitExceededLogMessage(
+ (true, memLimitExceededLogMessage(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case unknown =>
numExecutorsFailed += 1
- (false, "Container marked as failed: " + containerId + onHostStr +
+ (true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
}
- if (isNormalExit) {
- logInfo(containerExitReason)
- } else {
+ if (exitCausedByApp) {
logWarning(containerExitReason)
+ } else {
+ logInfo(containerExitReason)
}
- ExecutorExited(0, isNormalExit, containerExitReason)
+ ExecutorExited(0, exitCausedByApp, containerExitReason)
} else {
// If we have already released this container, then it must mean
// that the driver has explicitly requested it to be killed
- ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+ ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
s"Container $containerId exited from explicit termination request.")
}