aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLian, Cheng <rhythm.mail@gmail.com>2013-11-29 15:56:47 +0800
committerLian, Cheng <rhythm.mail@gmail.com>2013-11-29 15:56:47 +0800
commit1e25086009ff6421790609e406d00e1b978d6dbe (patch)
tree809903f21ccb9d0beb751e272c68d14bd8a7c2f0 /core
parent18def5d6f20b33c946f9b8b2cea8cfb6848dcc34 (diff)
downloadspark-1e25086009ff6421790609e406d00e1b978d6dbe.tar.gz
spark-1e25086009ff6421790609e406d00e1b978d6dbe.tar.bz2
spark-1e25086009ff6421790609e406d00e1b978d6dbe.zip
Updated some inline comments in DAGScheduler
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala31
1 files changed, 26 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2bf08c33f..08cf76325b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -154,24 +154,43 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ /**
+ * Starts the event processing actor. The actor has two responsibilities:
+ *
+ * 1. Waits for events like job submission, task finished, task failure etc., and calls
+ * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+ * 2. Schedules a periodical task to resubmit failed stages.
+ *
+ * NOTE: the actor cannot be started in the constructor, because the periodical task references
+ * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+ * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+ */
def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
var resubmissionTask: Cancellable = _
override def preStart() {
+ /**
+ * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+ * stages. In this way, stage resubmission can be done within the same thread context of
+ * other event processing logic to avoid unnecessary synchronization overhead.
+ */
resubmissionTask = context.system.scheduler.schedule(
RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
}
/**
- * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
- * events and responds by launching tasks. This runs in a dedicated thread and receives events
- * via the eventQueue.
+ * The main event loop of the DAG scheduler.
*/
def receive = {
case event: DAGSchedulerEvent =>
logDebug("Got event of type " + event.getClass.getName)
+ /**
+ * All events are forwarded to `processEvent()`, so that the event processing logic can
+ * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
+ * for details.
+ */
if (!processEvent(event)) {
submitWaitingStages()
} else {
@@ -383,8 +402,10 @@ class DAGScheduler(
}
/**
- * Process one event retrieved from the event queue.
- * Returns true if we should stop the event loop.
+ * Process one event retrieved from the event processing actor.
+ *
+ * @param event The event to be processed.
+ * @return `true` if we should stop the event loop.
*/
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {