aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLian, Cheng <rhythm.mail@gmail.com>2013-11-11 01:25:35 +0800
committerLian, Cheng <rhythm.mail@gmail.com>2013-11-11 01:25:35 +0800
commitba552851771cf8eaf90b72b661c3df60080d0ef9 (patch)
treed6a4c36bc1ed7cac2211c9c5eee17109c80171c8 /core
parent765ebca04f3dce1685c64022425bd281993be90e (diff)
downloadspark-ba552851771cf8eaf90b72b661c3df60080d0ef9.tar.gz
spark-ba552851771cf8eaf90b72b661c3df60080d0ef9.tar.bz2
spark-ba552851771cf8eaf90b72b661c3df60080d0ef9.zip
Put the periodical resubmitFailedStages() call into a scheduled task
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala28
1 files changed, 12 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a73a6e19f4..74995706a8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,8 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.{Props, Actor, ActorRef}
+import akka.actor._
+import akka.util.duration._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import org.apache.spark._
@@ -110,6 +111,13 @@ class DAGScheduler(
val POLL_TIMEOUT = 10L
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+ override def preStart() {
+ env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+ if (failed.size > 0)
+ resubmitFailedStages()
+ }
+ }
+
/**
* The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
* events and responds by launching tasks. This runs in a dedicated thread and receives events
@@ -119,22 +127,10 @@ class DAGScheduler(
case event: DAGSchedulerEvent =>
logDebug("Got event of type " + event.getClass.getName)
- if (!processEvent(event)) {
- val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
- // Periodically resubmit failed stages if some map output fetches have failed and we have
- // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
- // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
- // the same time, so we want to make sure we've identified all the reduce tasks that depend
- // on the failed node.
- if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
- resubmitFailedStages()
- } else {
- submitWaitingStages()
- }
- }
- else {
+ if (!processEvent(event))
+ submitWaitingStages()
+ else
context.stop(self)
- }
}
}))