diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2014-02-06 16:15:24 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-02-06 16:15:24 -0800 |
commit | 0b448df6ac520a7977b1eb51e8c55e33f3fd2da8 (patch) | |
tree | 29865970e8618fab33f3eaf458a11d4a4aceb96a | |
parent | 18ad59e2c6b7bd009e8ba5ebf8fcf99630863029 (diff) | |
download | spark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.tar.gz spark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.tar.bz2 spark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.zip |
Merge pull request #450 from kayousterhout/fetch_failures. Closes #450.
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit e603784b3a562980e6f1863845097effe2129d3b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Feb 5 11:34:41 2014 -0800
Re-add check for empty set of failed stages
commit d258f0ef50caff4bbb19fb95a6b82186db1935bf
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 15 23:35:41 2014 -0800
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 33 |
1 files changed, 11 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 821241508e..21d16fabef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -155,7 +155,6 @@ class DAGScheduler( val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures // Missing tasks from each stage val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] - var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] @@ -177,22 +176,6 @@ class DAGScheduler( def start() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { /** - * A handle to the periodical task, used to cancel the task when the actor is stopped. - */ - var resubmissionTask: Cancellable = _ - - override def preStart() { - import context.dispatcher - /** - * A message is sent to the actor itself periodically to remind the actor to resubmit failed - * stages. In this way, stage resubmission can be done within the same thread context of - * other event processing logic to avoid unnecessary synchronization overhead. - */ - resubmissionTask = context.system.scheduler.schedule( - RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages) - } - - /** * The main event loop of the DAG scheduler. */ def receive = { @@ -207,7 +190,6 @@ class DAGScheduler( if (!processEvent(event)) { submitWaitingStages() } else { - resubmissionTask.cancel() context.stop(self) } } @@ -620,6 +602,8 @@ class DAGScheduler( case ResubmitFailedStages => if (failed.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. resubmitFailedStages() } @@ -926,7 +910,6 @@ class DAGScheduler( // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) running -= failedStage - failed += failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") @@ -938,10 +921,16 @@ class DAGScheduler( } logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") + if (failed.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + } + failed += failedStage failed += mapStage - // Remember that a fetch failed now; this is used to resubmit the broken - // stages later, after a small wait (to give other tasks the chance to fail) - lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) |