diff options
author | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-28 17:46:06 +0800 |
---|---|---|
committer | Lian, Cheng <rhythm.mail@gmail.com> | 2013-11-28 17:46:06 +0800 |
commit | 18def5d6f20b33c946f9b8b2cea8cfb6848dcc34 (patch) | |
tree | 630c8f05e66cbf056c7ed36532e7b7c4d5373640 /core | |
parent | 743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff) | |
download | spark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.tar.gz spark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.tar.bz2 spark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.zip |
Bugfix: SPARK-965 & SPARK-966
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back DAGScheduler.start(), eventProcessActor is created and started here.
Notice that function is only called by SparkContext.
* Cancel the scheduled stage resubmission task when stopping eventProcessActor
* Add a new DAGSchedulerEvent ResubmitFailedStages
This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.
Please refer to discussion in SPARK-966 for details.
Diffstat (limited to 'core')
3 files changed, 40 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3a80241daa..c314f01894 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -270,6 +270,7 @@ class SparkContext( taskScheduler.start() @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) + dagScheduler.start() ui.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4457525ac8..e2bf08c33f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -113,30 +113,7 @@ class DAGScheduler( // Warns the user if a stage contains a task with size greater than this value (in KB) val TASK_SIZE_TO_WARN = 100 - private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { - override def preStart() { - context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { - if (failed.size > 0) { - resubmitFailedStages() - } - } - } - - /** - * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure - * events and responds by launching tasks. This runs in a dedicated thread and receives events - * via the eventQueue. - */ - def receive = { - case event: DAGSchedulerEvent => - logDebug("Got event of type " + event.getClass.getName) - - if (!processEvent(event)) - submitWaitingStages() - else - context.stop(self) - } - })) + private var eventProcessActor: ActorRef = _ private[scheduler] val nextJobId = new AtomicInteger(0) @@ -177,6 +154,34 @@ class DAGScheduler( val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup) + def start() { + eventProcessActor = env.actorSystem.actorOf(Props(new Actor { + var resubmissionTask: Cancellable = _ + + override def preStart() { + resubmissionTask = context.system.scheduler.schedule( + RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages) + } + + /** + * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure + * events and responds by launching tasks. This runs in a dedicated thread and receives events + * via the eventQueue. + */ + def receive = { + case event: DAGSchedulerEvent => + logDebug("Got event of type " + event.getClass.getName) + + if (!processEvent(event)) { + submitWaitingStages() + } else { + resubmissionTask.cancel() + context.stop(self) + } + } + })) + } + def addSparkListener(listener: SparkListener) { listenerBus.addListener(listener) } @@ -457,6 +462,11 @@ class DAGScheduler( case TaskSetFailed(taskSet, reason) => abortStage(stageIdToStage(taskSet.stageId), reason) + case ResubmitFailedStages => + if (failed.size > 0) { + resubmitFailedStages() + } + case StopDAGScheduler => // Cancel any active jobs for (job <- activeJobs) { @@ -900,7 +910,9 @@ class DAGScheduler( } def stop() { - eventProcessActor ! StopDAGScheduler + if (eventProcessActor != null) { + eventProcessActor ! StopDAGScheduler + } metadataCleaner.cancel() taskSched.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 708d221d60..5353cd24dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -73,4 +73,6 @@ private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerE private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent +private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent + private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent |