aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-26 21:59:48 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-26 22:12:37 -0700
commitbf719056b71d55e1194554661dfa194ed03d364d (patch)
treeab678f2739ee60f44193e21af912ef67978cd081 /core
parente70aff6c2d7f216060def0bd02be6a3d9017cd13 (diff)
downloadspark-bf719056b71d55e1194554661dfa194ed03d364d.tar.gz
spark-bf719056b71d55e1194554661dfa194ed03d364d.tar.bz2
spark-bf719056b71d55e1194554661dfa194ed03d364d.zip
[SPARK-3224] FetchFailed reduce stages should only show up once in failed stages (in UI)
This is a HOTFIX for 1.1. Author: Reynold Xin <rxin@apache.org> Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #2127 from rxin/SPARK-3224 and squashes the following commits: effb1ce [Reynold Xin] Move log message. 49282b3 [Reynold Xin] Kay's feedback. 3f01847 [Reynold Xin] Merge pull request #2 from kayousterhout/SPARK-3224 796d282 [Kay Ousterhout] Added unit test for SPARK-3224 3d3d356 [Reynold Xin] Remove map output loc even for repeated FetchFaileds. 1dd3eb5 [Reynold Xin] [SPARK-3224] FetchFailed reduce stages should only show up once in the failed stages UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala41
2 files changed, 59 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3413198457..2ccc27324a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1045,31 +1045,39 @@ class DAGScheduler(
stage.pendingTasks += task
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
- // Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
- markStageAsFinished(failedStage, Some("Fetch failure"))
- runningStages -= failedStage
- // TODO: Cancel running tasks in the stage
- logInfo("Marking " + failedStage + " (" + failedStage.name +
- ") for resubmision due to a fetch failure")
- // Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
- if (mapId != -1) {
- mapStage.removeOutputLoc(mapId, bmAddress)
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+
+ // It is likely that we receive multiple FetchFailed for a single stage (because we have
+ // multiple tasks running concurrently on different executors). In that case, it is possible
+ // the fetch failure has already been handled by the scheduler.
+ if (runningStages.contains(failedStage)) {
+ logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
+ s"due to a fetch failure from $mapStage (${mapStage.name})")
+ markStageAsFinished(failedStage, Some("Fetch failure"))
+ runningStages -= failedStage
}
- logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
- "); marking it for resubmission")
+
if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
+ // TODO: Cancel running tasks in the stage
import env.actorSystem.dispatcher
+ logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+ s"$failedStage (${failedStage.name}) due to fetch failure")
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failedStages += failedStage
failedStages += mapStage
+
+ // Mark the map whose fetch failed as broken in the map stage
+ if (mapId != -1) {
+ mapStage.removeOutputLoc(mapId, bmAddress)
+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
+ }
+
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index bd829752eb..f5fed988ad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import scala.collection.mutable.{HashSet, HashMap, Map}
+import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
import scala.language.reflectiveCalls
import akka.actor._
@@ -98,7 +98,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() {
val successfulStages = new HashSet[Int]()
- val failedStages = new HashSet[Int]()
+ val failedStages = new ArrayBuffer[Int]()
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
if (stageInfo.failureReason.isEmpty) {
@@ -435,6 +435,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("trivial shuffle with multiple fetch failures") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduceRdd, Array(0, 1))
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
+ // The MapOutputTracker should know about both map output locations.
+ assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
+ Array("hostA", "hostB"))
+
+ // The first result task fails, with a fetch failure for the output from the first mapper.
+ runEvent(CompletionEvent(
+ taskSets(1).tasks(0),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
+ null,
+ Map[Long, Any](),
+ null,
+ null))
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.contains(0))
+
+ // The second ResultTask fails, with a fetch failure for the output from the second mapper.
+ runEvent(CompletionEvent(
+ taskSets(1).tasks(0),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
+ null,
+ Map[Long, Any](),
+ null,
+ null))
+ // The SparkListener should not receive redundant failure events.
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ assert(sparkListener.failedStages.size == 1)
+ }
+
test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)