diff options
author | w00228970 <wangfei1@huawei.com> | 2016-09-28 12:02:59 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-09-28 12:02:59 -0700 |
commit | 46d1203bf2d01b219c4efc7e0e77a844c0c664da (patch) | |
tree | 37c219e3d7f92dde99870543b694b3dbc77144ec /core/src/main/scala/org/apache | |
parent | 2190037757a81d3172f75227f7891d968e1f0d90 (diff) | |
download | spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.gz spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.bz2 spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.zip |
[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
## What changes were proposed in this pull request?
| Time |Thread 1 , Job1 | Thread 2 , Job2 |
|:-------------:|:-------------:|:-----:|
| 1 | abort stage due to FetchFailed | |
| 2 | failedStages += failedStage | |
| 3 | | task failed due to FetchFailed |
| 4 | | can not post ResubmitFailedStages because failedStages is not empty |
Then job2 of thread2 never resubmit the failed stage and hang.
We should not add the failedStages when abortStage for fetch failure
## How was this patch tested?
added unit test
Author: w00228970 <wangfei1@huawei.com>
Author: wangfei <wangfei_hello@126.com>
Closes #15213 from scwf/dag-resubmit.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5ea0b48f6e..f2517401cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1263,18 +1263,20 @@ class DAGScheduler( s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: ${failureMessage}", None) - } else if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } else { + if (failedStages.isEmpty) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. + // TODO: Cancel running tasks in the stage + logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure") + messageScheduler.schedule(new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + } + failedStages += failedStage + failedStages += mapStage } - failedStages += failedStage - failedStages += mapStage // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { mapStage.removeOutputLoc(mapId, bmAddress) |