aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorw00228970 <wangfei1@huawei.com>2016-09-28 12:02:59 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-28 12:02:59 -0700
commit46d1203bf2d01b219c4efc7e0e77a844c0c664da (patch)
tree37c219e3d7f92dde99870543b694b3dbc77144ec /core/src/main/scala/org/apache
parent2190037757a81d3172f75227f7891d968e1f0d90 (diff)
downloadspark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.gz
spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.bz2
spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.zip
[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
## What changes were proposed in this pull request? | Time |Thread 1 , Job1 | Thread 2 , Job2 | |:-------------:|:-------------:|:-----:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage | | | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure ## How was this patch tested? added unit test Author: w00228970 <wangfei1@huawei.com> Author: wangfei <wangfei_hello@126.com> Closes #15213 from scwf/dag-resubmit.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala24
1 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ea0b48f6e..f2517401cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1263,18 +1263,20 @@ class DAGScheduler(
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
- } else if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ } else {
+ if (failedStages.isEmpty) {
+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
+ // in that case the event will already have been scheduled.
+ // TODO: Cancel running tasks in the stage
+ logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+ s"$failedStage (${failedStage.name}) due to fetch failure")
+ messageScheduler.schedule(new Runnable {
+ override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
+ }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ }
+ failedStages += failedStage
+ failedStages += mapStage
}
- failedStages += failedStage
- failedStages += mapStage
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)