aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-02-24 16:58:57 -0800
committerMatei Zaharia <matei@databricks.com>2014-02-24 16:58:57 -0800
commitd8d190efd2d08c3894b20f6814b10f9ca2157309 (patch)
treede0c21e8ffd3dad5567fed708d145e0f7ea9df64
parentc0ef3afa82c1eaf58ff5efec961540a74b639fd9 (diff)
parent0187cef0f284e6cb22cb3986c327c43304daf57d (diff)
downloadspark-d8d190efd2d08c3894b20f6814b10f9ca2157309.tar.gz
spark-d8d190efd2d08c3894b20f6814b10f9ca2157309.tar.bz2
spark-d8d190efd2d08c3894b20f6814b10f9ca2157309.zip
Merge pull request #641 from mateiz/spark-1124-master
SPARK-1124: Fix infinite retries of reduce stage when a map stage failed In the previous code, if you had a failing map stage and then tried to run reduce stages on it repeatedly, the first reduce stage would fail correctly, but the later ones would mistakenly believe that all map outputs are available and start failing infinitely with fetch failures from "null". See https://spark-project.atlassian.net/browse/SPARK-1124 for an example. This PR also cleans up code style slightly where there was a variable named "s" and some weird map manipulation.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala13
2 files changed, 30 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 729f518b89..dc5b25d845 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -272,8 +272,10 @@ class DAGScheduler(
if (mapOutputTracker.has(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
- for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
- stage.numAvailableOutputs = locs.size
+ for (i <- 0 until locs.size) {
+ stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
+ }
+ stage.numAvailableOutputs = locs.count(_ != null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
@@ -373,25 +375,26 @@ class DAGScheduler(
} else {
def removeStage(stageId: Int) {
// data structures based on Stage
- stageIdToStage.get(stageId).foreach { s =>
- if (running.contains(s)) {
+ for (stage <- stageIdToStage.get(stageId)) {
+ if (running.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
- running -= s
+ running -= stage
+ }
+ stageToInfos -= stage
+ for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
+ shuffleToMapStage.remove(k)
}
- stageToInfos -= s
- shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
- shuffleToMapStage.remove(shuffleId))
- if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
+ if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId))
}
- pendingTasks -= s
- if (waiting.contains(s)) {
+ pendingTasks -= stage
+ if (waiting.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
- waiting -= s
+ waiting -= stage
}
- if (failed.contains(s)) {
+ if (failed.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
- failed -= s
+ failed -= stage
}
}
// data structures based on StageId
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index ac3c86778d..f3fb64d87a 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -81,6 +81,19 @@ class FailureSuite extends FunSuite with LocalSparkContext {
FailureSuiteState.clear()
}
+ // Run a map-reduce job in which the map stage always fails.
+ test("failure in a map stage") {
+ sc = new SparkContext("local", "test")
+ val data = sc.makeRDD(1 to 3).map(x => { throw new Exception; (x, x) }).groupByKey(3)
+ intercept[SparkException] {
+ data.collect()
+ }
+ // Make sure that running new jobs with the same map stage also fails
+ intercept[SparkException] {
+ data.collect()
+ }
+ }
+
test("failure because task results are not serializable") {
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)