diff options
author | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-29 15:56:47 +0800 |
---|---|---|
committer | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-29 15:56:47 +0800 |
commit | 1e25086009ff6421790609e406d00e1b978d6dbe (patch) | |
tree | 809903f21ccb9d0beb751e272c68d14bd8a7c2f0 /core | |
parent | 18def5d6f20b33c946f9b8b2cea8cfb6848dcc34 (diff) | |
download | spark-1e25086009ff6421790609e406d00e1b978d6dbe.tar.gz spark-1e25086009ff6421790609e406d00e1b978d6dbe.tar.bz2 spark-1e25086009ff6421790609e406d00e1b978d6dbe.zip |
Updated some inline comments in DAGScheduler
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2bf08c33f..08cf76325b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -154,24 +154,43 @@ class DAGScheduler( val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup) + /** + * Starts the event processing actor. The actor has two responsibilities: + * + * 1. Waits for events like job submission, task finished, task failure etc., and calls + * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them. + * 2. Schedules a periodical task to resubmit failed stages. + * + * NOTE: the actor cannot be started in the constructor, because the periodical task references + * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus + * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. + */ def start() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { var resubmissionTask: Cancellable = _ override def preStart() { + /** + * A message is sent to the actor itself periodically to remind the actor to resubmit failed + * stages. In this way, stage resubmission can be done within the same thread context of + * other event processing logic to avoid unnecessary synchronization overhead. + */ resubmissionTask = context.system.scheduler.schedule( RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages) } /** - * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure - * events and responds by launching tasks. This runs in a dedicated thread and receives events - * via the eventQueue. + * The main event loop of the DAG scheduler. */ def receive = { case event: DAGSchedulerEvent => logDebug("Got event of type " + event.getClass.getName) + /** + * All events are forwarded to `processEvent()`, so that the event processing logic can + * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite` + * for details. + */ if (!processEvent(event)) { submitWaitingStages() } else { @@ -383,8 +402,10 @@ class DAGScheduler( } /** - * Process one event retrieved from the event queue. - * Returns true if we should stop the event loop. + * Process one event retrieved from the event processing actor. + * + * @param event The event to be processed. + * @return `true` if we should stop the event loop. */ private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { |