aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJason Moore <jasonmoore2k@outlook.com>2016-05-05 11:02:35 +0100
committerSean Owen <sowen@cloudera.com>2016-05-05 11:02:35 +0100
commit77361a433adce109c2b752b11dda25b56eca0352 (patch)
treeb882928be9e386067d736f11283e3dcb8c623919 /core/src
parent104430223eb62a7946f939fbf97242c636adbebe (diff)
downloadspark-77361a433adce109c2b752b11dda25b56eca0352.tar.gz
spark-77361a433adce109c2b752b11dda25b56eca0352.tar.bz2
spark-77361a433adce109c2b752b11dda25b56eca0352.zip
[SPARK-14915][CORE] Don't re-queue a task if another attempt has already succeeded
## What changes were proposed in this pull request? Don't re-queue a task if another attempt has already succeeded. This currently happens when a speculative task is denied from committing the result due to another copy of the task already having succeeded. ## How was this patch tested? I'm running a job which has a fair bit of skew in the processing time across the tasks for speculation to trigger in the last quarter (default settings), causing many commit denied exceptions to be thrown. Previously, these tasks were then being retried over and over again until the stage possibly completes (despite using compute resources on these superfluous tasks). With this change (applied to the 1.6 branch), they no longer retry and the stage completes successfully without these extra task attempts. Author: Jason Moore <jasonmoore2k@outlook.com> Closes #12751 from jasonmoore2k/SPARK-14915.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index bfa1e86749..08d33f688a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -716,7 +716,16 @@ private[spark] class TaskSetManager(
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
- addPendingTask(index)
+
+ if (successful(index)) {
+ logInfo(
+ s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
+ "but another instance of the task has already succeeded, " +
+ "so not re-queuing the task to be re-executed.")
+ } else {
+ addPendingTask(index)
+ }
+
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {