aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2013-01-22 00:23:31 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2013-01-22 00:23:31 -0800
commite353886a8ca6179f25b4176d7a62b5d04ce79276 (patch)
tree0acb9f0b8bfe0c935143a98ea9afe1461080f491
parent7d3e359f2c463681cf0128da2c6692beb13dade9 (diff)
downloadspark-e353886a8ca6179f25b4176d7a62b5d04ce79276.tar.gz
spark-e353886a8ca6179f25b4176d7a62b5d04ce79276.tar.bz2
spark-e353886a8ca6179f25b4176d7a62b5d04ce79276.zip
Use generation numbers for fetch failure tracking
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala27
1 files changed, 20 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 59f2099e91..39a1e6d6c6 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -72,8 +72,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val cacheTracker = env.cacheTracker
val mapOutputTracker = env.mapOutputTracker
- val deadHosts = new HashSet[String] // TODO: The code currently assumes these can't come back;
- // that's not going to be a realistic assumption in general
+ // For tracking failed nodes, we use the MapOutputTracker's generation number, which is
+ // sent with every task. When we detect a node failing, we note the current generation number
+ // and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
+ // results.
+ // TODO: Garbage collect information about failure generations when new stages start.
+ val failedGeneration = new HashMap[String, Long]
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now
@@ -429,7 +433,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val status = event.result.asInstanceOf[MapStatus]
val host = status.address.ip
logInfo("ShuffleMapTask finished with host " + host)
- if (!deadHosts.contains(host)) { // TODO: Make sure hostnames are consistent with Mesos
+ if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
+ logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
+ } else {
stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
@@ -495,7 +501,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the host as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleHostLost(bmAddress.ip)
+ handleHostLost(bmAddress.ip, Some(task.generation))
}
case other =>
@@ -507,11 +513,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
/**
* Responds to a host being lost. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
+ *
+ * Optionally the generation during which the failure was caught can be passed to avoid allowing
+ * stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- def handleHostLost(host: String) {
- if (!deadHosts.contains(host)) {
+ def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
+ val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
+ if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
+ failedGeneration(host) = currentGeneration
logInfo("Host lost: " + host)
- deadHosts += host
env.blockManager.master.notifyADeadHost(host)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
@@ -519,6 +529,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
}
+ if (shuffleToMapStage.isEmpty) {
+ mapOutputTracker.incrementGeneration()
+ }
cacheTracker.cacheLost(host)
updateCacheLocs()
}