aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJin Xing <jinxing@meituan.com>2017-02-06 10:51:26 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-06 10:51:26 -0800
commitd33021b3124b53e6ef3953f163b312a8ead186a2 (patch)
treef87fce29ee560cac27aae118fd3fc49f8a90a38a
parentaff53021cf828cd7c139d8ec230d45593078b73a (diff)
downloadspark-d33021b3124b53e6ef3953f163b312a8ead186a2.tar.gz
spark-d33021b3124b53e6ef3953f163b312a8ead186a2.tar.bz2
spark-d33021b3124b53e6ef3953f163b312a8ead186a2.zip
[SPARK-19398] Change one misleading log in TaskSetManager.
## What changes were proposed in this pull request? Log below is misleading: ``` if (successful(index)) { logInfo( s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " + "but another instance of the task has already succeeded, " + "so not re-queuing the task to be re-executed.") } ``` If fetch failed, the task is marked as successful in `TaskSetManager:: handleFailedTask`. Then log above will be printed. The `successful` just means task will not be scheduled any longer, not a real success. ## How was this patch tested? Existing unit tests can cover this. Author: jinxing <jinxing@meituan.com> Closes #16738 from jinxing64/SPARK-19398.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c7ff13cebf..88251435a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -70,6 +70,10 @@ private[spark] class TaskSetManager(
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
+
+ // For each task, tracks whether a copy of the task has succeeded. A task will also be
+ // marked as "succeeded" if it failed with a fetch failure, in which case it should not
+ // be re-run because the missing map data needs to be regenerated first.
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)
@@ -797,10 +801,10 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
if (successful(index)) {
- logInfo(
- s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
- "but another instance of the task has already succeeded, " +
- "so not re-queuing the task to be re-executed.")
+ logInfo(s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, but the task will not" +
+ s" be re-executed (either because the task failed with a shuffle data fetch failure," +
+ s" so the previous stage needs to be re-run, or because a different copy of the task" +
+ s" has already succeeded).")
} else {
addPendingTask(index)
}