diff options
author | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-11 01:25:35 +0800 |
---|---|---|
committer | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-11 01:25:35 +0800 |
commit | ba552851771cf8eaf90b72b661c3df60080d0ef9 (patch) | |
tree | d6a4c36bc1ed7cac2211c9c5eee17109c80171c8 /core | |
parent | 765ebca04f3dce1685c64022425bd281993be90e (diff) | |
download | spark-ba552851771cf8eaf90b72b661c3df60080d0ef9.tar.gz spark-ba552851771cf8eaf90b72b661c3df60080d0ef9.tar.bz2 spark-ba552851771cf8eaf90b72b661c3df60080d0ef9.zip |
Put the periodical resubmitFailedStages() call into a scheduled task
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a73a6e19f4..74995706a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,8 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{Props, Actor, ActorRef} +import akka.actor._ +import akka.util.duration._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import org.apache.spark._ @@ -110,6 +111,13 @@ class DAGScheduler( val POLL_TIMEOUT = 10L private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { + override def preStart() { + env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { + if (failed.size > 0) + resubmitFailedStages() + } + } + /** * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure * events and responds by launching tasks. This runs in a dedicated thread and receives events @@ -119,22 +127,10 @@ class DAGScheduler( case event: DAGSchedulerEvent => logDebug("Got event of type " + event.getClass.getName) - if (!processEvent(event)) { - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability - // Periodically resubmit failed stages if some map output fetches have failed and we have - // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, - // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at - // the same time, so we want to make sure we've identified all the reduce tasks that depend - // on the failed node. - if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages() - } else { - submitWaitingStages() - } - } - else { + if (!processEvent(event)) + submitWaitingStages() + else context.stop(self) - } } })) |