aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLian, Cheng <rhythm.mail@gmail.com>2013-11-28 17:46:06 +0800
committerLian, Cheng <rhythm.mail@gmail.com>2013-11-28 17:46:06 +0800
commit18def5d6f20b33c946f9b8b2cea8cfb6848dcc34 (patch)
tree630c8f05e66cbf056c7ed36532e7b7c4d5373640 /core
parent743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff)
downloadspark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.tar.gz
spark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.tar.bz2
spark-18def5d6f20b33c946f9b8b2cea8cfb6848dcc34.zip
Bugfix: SPARK-965 & SPARK-966
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965 SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966 * Add back DAGScheduler.start(), eventProcessActor is created and started here. Notice that function is only called by SparkContext. * Cancel the scheduled stage resubmission task when stopping eventProcessActor * Add a new DAGSchedulerEvent ResubmitFailedStages This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent. Please refer to discussion in SPARK-966 for details.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
3 files changed, 40 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3a80241daa..c314f01894 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -270,6 +270,7 @@ class SparkContext(
taskScheduler.start()
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
+ dagScheduler.start()
ui.start()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525ac8..e2bf08c33f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -113,30 +113,7 @@ class DAGScheduler(
// Warns the user if a stage contains a task with size greater than this value (in KB)
val TASK_SIZE_TO_WARN = 100
- private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
- override def preStart() {
- context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
- if (failed.size > 0) {
- resubmitFailedStages()
- }
- }
- }
-
- /**
- * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
- * events and responds by launching tasks. This runs in a dedicated thread and receives events
- * via the eventQueue.
- */
- def receive = {
- case event: DAGSchedulerEvent =>
- logDebug("Got event of type " + event.getClass.getName)
-
- if (!processEvent(event))
- submitWaitingStages()
- else
- context.stop(self)
- }
- }))
+ private var eventProcessActor: ActorRef = _
private[scheduler] val nextJobId = new AtomicInteger(0)
@@ -177,6 +154,34 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ def start() {
+ eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+ var resubmissionTask: Cancellable = _
+
+ override def preStart() {
+ resubmissionTask = context.system.scheduler.schedule(
+ RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
+ }
+
+ /**
+ * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
+ * events and responds by launching tasks. This runs in a dedicated thread and receives events
+ * via the eventQueue.
+ */
+ def receive = {
+ case event: DAGSchedulerEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+
+ if (!processEvent(event)) {
+ submitWaitingStages()
+ } else {
+ resubmissionTask.cancel()
+ context.stop(self)
+ }
+ }
+ }))
+ }
+
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
@@ -457,6 +462,11 @@ class DAGScheduler(
case TaskSetFailed(taskSet, reason) =>
abortStage(stageIdToStage(taskSet.stageId), reason)
+ case ResubmitFailedStages =>
+ if (failed.size > 0) {
+ resubmitFailedStages()
+ }
+
case StopDAGScheduler =>
// Cancel any active jobs
for (job <- activeJobs) {
@@ -900,7 +910,9 @@ class DAGScheduler(
}
def stop() {
- eventProcessActor ! StopDAGScheduler
+ if (eventProcessActor != null) {
+ eventProcessActor ! StopDAGScheduler
+ }
metadataCleaner.cancel()
taskSched.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 708d221d60..5353cd24dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,4 +73,6 @@ private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerE
private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
+private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent