aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-11-30 23:38:49 -0800
committerReynold Xin <rxin@apache.org>2013-11-30 23:38:49 -0800
commit60e23a58b288dae3c87da28e1506323b1d88ee9e (patch)
treee786f51789f1e6f3885f60ce791b40b99ac2f082 /core
parent743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff)
parent4a1d966e26e56fc5d42a828f414b4eca433c3a22 (diff)
downloadspark-60e23a58b288dae3c87da28e1506323b1d88ee9e.tar.gz
spark-60e23a58b288dae3c87da28e1506323b1d88ee9e.tar.bz2
spark-60e23a58b288dae3c87da28e1506323b1d88ee9e.zip
Merge pull request #216 from liancheng/fix-spark-966
Bugfix: SPARK-965 & SPARK-966 SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965 SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966 * Add back `DAGScheduler.start()`, `eventProcessActor` is created and started here. Notice that function is only called by `SparkContext`. * Cancel the scheduled stage resubmission task when stopping `eventProcessActor` * Add a new `DAGSchedulerEvent` `ResubmitFailedStages` This event message is sent by the scheduled stage resubmission task to `eventProcessActor`. In this way, `DAGScheduler.resubmitFailedStages()` is guaranteed to be executed from the same thread that runs `DAGScheduler.processEvent()`. Please refer to discussion in [SPARK-966](https://spark-project.atlassian.net/browse/SPARK-966) for details.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
3 files changed, 66 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3a80241daa..c314f01894 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -270,6 +270,7 @@ class SparkContext(
taskScheduler.start()
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
+ dagScheduler.start()
ui.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525ac8..bc37a70e98 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -113,30 +113,7 @@ class DAGScheduler(
// Warns the user if a stage contains a task with size greater than this value (in KB)
val TASK_SIZE_TO_WARN = 100
- private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
- override def preStart() {
- context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
- if (failed.size > 0) {
- resubmitFailedStages()
- }
- }
- }
-
- /**
- * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
- * events and responds by launching tasks. This runs in a dedicated thread and receives events
- * via the eventQueue.
- */
- def receive = {
- case event: DAGSchedulerEvent =>
- logDebug("Got event of type " + event.getClass.getName)
-
- if (!processEvent(event))
- submitWaitingStages()
- else
- context.stop(self)
- }
- }))
+ private var eventProcessActor: ActorRef = _
private[scheduler] val nextJobId = new AtomicInteger(0)
@@ -177,6 +154,56 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ /**
+ * Starts the event processing actor. The actor has two responsibilities:
+ *
+ * 1. Waits for events like job submission, task finished, task failure etc., and calls
+ * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+ * 2. Schedules a periodical task to resubmit failed stages.
+ *
+ * NOTE: the actor cannot be started in the constructor, because the periodical task references
+ * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+ * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+ */
+ def start() {
+ eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+ /**
+ * A handle to the periodical task, used to cancel the task when the actor is stopped.
+ */
+ var resubmissionTask: Cancellable = _
+
+ override def preStart() {
+ /**
+ * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+ * stages. In this way, stage resubmission can be done within the same thread context of
+ * other event processing logic to avoid unnecessary synchronization overhead.
+ */
+ resubmissionTask = context.system.scheduler.schedule(
+ RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
+ }
+
+ /**
+ * The main event loop of the DAG scheduler.
+ */
+ def receive = {
+ case event: DAGSchedulerEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+
+ /**
+ * All events are forwarded to `processEvent()`, so that the event processing logic can
+ * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
+ * for details.
+ */
+ if (!processEvent(event)) {
+ submitWaitingStages()
+ } else {
+ resubmissionTask.cancel()
+ context.stop(self)
+ }
+ }
+ }))
+ }
+
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
@@ -378,8 +405,10 @@ class DAGScheduler(
}
/**
- * Process one event retrieved from the event queue.
- * Returns true if we should stop the event loop.
+ * Process one event retrieved from the event processing actor.
+ *
+ * @param event The event to be processed.
+ * @return `true` if we should stop the event loop.
*/
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
@@ -457,6 +486,11 @@ class DAGScheduler(
case TaskSetFailed(taskSet, reason) =>
abortStage(stageIdToStage(taskSet.stageId), reason)
+ case ResubmitFailedStages =>
+ if (failed.size > 0) {
+ resubmitFailedStages()
+ }
+
case StopDAGScheduler =>
// Cancel any active jobs
for (job <- activeJobs) {
@@ -900,7 +934,9 @@ class DAGScheduler(
}
def stop() {
- eventProcessActor ! StopDAGScheduler
+ if (eventProcessActor != null) {
+ eventProcessActor ! StopDAGScheduler
+ }
metadataCleaner.cancel()
taskSched.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 708d221d60..5353cd24dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,4 +73,6 @@ private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerE
private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
+private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent