aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorw00228970 <wangfei1@huawei.com>2016-09-28 12:02:59 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-28 12:02:59 -0700
commit46d1203bf2d01b219c4efc7e0e77a844c0c664da (patch)
tree37c219e3d7f92dde99870543b694b3dbc77144ec
parent2190037757a81d3172f75227f7891d968e1f0d90 (diff)
downloadspark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.gz
spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.tar.bz2
spark-46d1203bf2d01b219c4efc7e0e77a844c0c664da.zip
[SPARK-17644][CORE] Do not add failedStages when abortStage for fetch failure
## What changes were proposed in this pull request? | Time |Thread 1 , Job1 | Thread 2 , Job2 | |:-------------:|:-------------:|:-----:| | 1 | abort stage due to FetchFailed | | | 2 | failedStages += failedStage | | | 3 | | task failed due to FetchFailed | | 4 | | can not post ResubmitFailedStages because failedStages is not empty | Then job2 of thread2 never resubmit the failed stage and hang. We should not add the failedStages when abortStage for fetch failure ## How was this patch tested? added unit test Author: w00228970 <wangfei1@huawei.com> Author: wangfei <wangfei_hello@126.com> Closes #15213 from scwf/dag-resubmit.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
2 files changed, 70 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ea0b48f6e..f2517401cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1263,18 +1263,20 @@ class DAGScheduler(
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
- } else if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ } else {
+ if (failedStages.isEmpty) {
+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
+ // in that case the event will already have been scheduled.
+ // TODO: Cancel running tasks in the stage
+ logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
+ s"$failedStage (${failedStage.name}) due to fetch failure")
+ messageScheduler.schedule(new Runnable {
+ override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
+ }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ }
+ failedStages += failedStage
+ failedStages += mapStage
}
- failedStages += failedStage
- failedStages += mapStage
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 6787b30261..bec95d13d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -31,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
-import org.apache.spark.shuffle.MetadataFetchFailedException
+import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
@@ -2105,6 +2106,61 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}
+ test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" +
+ "still behave correctly on fetch failures") {
+ // Runs a job that always encounters a fetch failure, so should eventually be aborted
+ def runJobWithPersistentFetchFailure: Unit = {
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
+ val shuffleHandle =
+ rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
+ rdd1.map {
+ case (x, _) if (x == 1) =>
+ throw new FetchFailedException(
+ BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
+ case (x, _) => x
+ }.count()
+ }
+
+ // Runs a job that encounters a single fetch failure but succeeds on the second attempt
+ def runJobWithTemporaryFetchFailure: Unit = {
+ object FailThisAttempt {
+ val _fail = new AtomicBoolean(true)
+ }
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
+ val shuffleHandle =
+ rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
+ rdd1.map {
+ case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
+ throw new FetchFailedException(
+ BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
+ }
+ }
+
+ failAfter(10.seconds) {
+ val e = intercept[SparkException] {
+ runJobWithPersistentFetchFailure
+ }
+ assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+ }
+
+ // Run a second job that will fail due to a fetch failure.
+ // This job will hang without the fix for SPARK-17644.
+ failAfter(10.seconds) {
+ val e = intercept[SparkException] {
+ runJobWithPersistentFetchFailure
+ }
+ assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
+ }
+
+ failAfter(10.seconds) {
+ try {
+ runJobWithTemporaryFetchFailure
+ } catch {
+ case e: Throwable => fail("A job with one fetch failure should eventually succeed")
+ }
+ }
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.