aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-02-06 16:15:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-02-06 16:15:24 -0800
commit0b448df6ac520a7977b1eb51e8c55e33f3fd2da8 (patch)
tree29865970e8618fab33f3eaf458a11d4a4aceb96a
parent18ad59e2c6b7bd009e8ba5ebf8fcf99630863029 (diff)
downloadspark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.tar.gz
spark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.tar.bz2
spark-0b448df6ac520a7977b1eb51e8c55e33f3fd2da8.zip
Merge pull request #450 from kayousterhout/fetch_failures. Closes #450.
Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. Author: Kay Ousterhout <kayousterhout@gmail.com> == Merge branch commits == commit e603784b3a562980e6f1863845097effe2129d3b Author: Kay Ousterhout <kayousterhout@gmail.com> Date: Wed Feb 5 11:34:41 2014 -0800 Re-add check for empty set of failed stages commit d258f0ef50caff4bbb19fb95a6b82186db1935bf Author: Kay Ousterhout <kayousterhout@gmail.com> Date: Wed Jan 15 23:35:41 2014 -0800 Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala33
1 files changed, 11 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 821241508e..21d16fabef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -155,7 +155,6 @@ class DAGScheduler(
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
// Missing tasks from each stage
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
- var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
@@ -177,22 +176,6 @@ class DAGScheduler(
def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
/**
- * A handle to the periodical task, used to cancel the task when the actor is stopped.
- */
- var resubmissionTask: Cancellable = _
-
- override def preStart() {
- import context.dispatcher
- /**
- * A message is sent to the actor itself periodically to remind the actor to resubmit failed
- * stages. In this way, stage resubmission can be done within the same thread context of
- * other event processing logic to avoid unnecessary synchronization overhead.
- */
- resubmissionTask = context.system.scheduler.schedule(
- RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
- }
-
- /**
* The main event loop of the DAG scheduler.
*/
def receive = {
@@ -207,7 +190,6 @@ class DAGScheduler(
if (!processEvent(event)) {
submitWaitingStages()
} else {
- resubmissionTask.cancel()
context.stop(self)
}
}
@@ -620,6 +602,8 @@ class DAGScheduler(
case ResubmitFailedStages =>
if (failed.size > 0) {
+ // Failed stages may be removed by job cancellation, so failed might be empty even if
+ // the ResubmitFailedStages event has been scheduled.
resubmitFailedStages()
}
@@ -926,7 +910,6 @@ class DAGScheduler(
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
running -= failedStage
- failed += failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")
@@ -938,10 +921,16 @@ class DAGScheduler(
}
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")
+ if (failed.isEmpty && eventProcessActor != null) {
+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
+ // in that case the event will already have been scheduled. eventProcessActor may be
+ // null during unit tests.
+ import env.actorSystem.dispatcher
+ env.actorSystem.scheduler.scheduleOnce(
+ RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
+ }
+ failed += failedStage
failed += mapStage
- // Remember that a fetch failed now; this is used to resubmit the broken
- // stages later, after a small wait (to give other tasks the chance to fail)
- lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))