aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-04-25 16:04:48 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-25 16:04:48 -0700
commit027f1b85f961ce16ee069afe3d90a36dce009994 (patch)
tree01f879fdbbe55590a9dff84d96495e5804317d92 /core
parentdf6d81425bf3b8830988288069f6863de873aee2 (diff)
downloadspark-027f1b85f961ce16ee069afe3d90a36dce009994.tar.gz
spark-027f1b85f961ce16ee069afe3d90a36dce009994.tar.bz2
spark-027f1b85f961ce16ee069afe3d90a36dce009994.zip
SPARK-1235: manage the DAGScheduler EventProcessActor with supervisor and refactor the DAGScheduler with Akka
https://spark-project.atlassian.net/browse/SPARK-1235 In the current implementation, the running job will hang if the DAGScheduler crashes for some reason (eventProcessActor throws exception in receive() ) The reason is that the actor will automatically restart when the exception is thrown during the running but is not captured properly (Akka behaviour), and the JobWaiters are still waiting there for the completion of the tasks In this patch, I refactored the DAGScheduler with Akka and manage the eventProcessActor with supervisor, so that upon the failure of a eventProcessActor, the supervisor will terminate the EventProcessActor and close the SparkContext thanks for @kayousterhout and @markhamstra to give the hints in JIRA Author: CodingCat <zhunansjtu@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Author: Nan Zhu <CodingCat@users.noreply.github.com> Closes #186 from CodingCat/SPARK-1235 and squashes the following commits: a7fb0ee [CodingCat] throw Exception on failure of creating DAG 124d82d [CodingCat] blocking the constructor until event actor is ready baf2d38 [CodingCat] fix the issue brought by non-blocking actorOf 35c886a [CodingCat] fix bug 82d08b3 [CodingCat] calling actorOf on system to ensure it is blocking 310a579 [CodingCat] style fix cd02d9a [Nan Zhu] small fix 561cfbc [CodingCat] recover doCheckpoint c048d0e [CodingCat] call submitWaitingStages for every event a9eea039 [CodingCat] address Matei's comments ac878ab [CodingCat] typo fix 5d1636a [CodingCat] re-trigger the test..... 9dfb033 [CodingCat] remove unnecessary changes a7a2a97 [CodingCat] add StageCancelled message fdf3b17 [CodingCat] just to retrigger the test...... 089bc2f [CodingCat] address andrew's comments 228f4b0 [CodingCat] address comments from Mark b68c1c7 [CodingCat] refactor DAGScheduler with Akka 810efd8 [Xiangrui Meng] akka solution
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala419
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala2
7 files changed, 290 insertions, 221 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e9d2f57579..eb14d87467 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -300,10 +300,17 @@ class SparkContext(config: SparkConf) extends Logging {
// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
- taskScheduler.start()
+ @volatile private[spark] var dagScheduler: DAGScheduler = _
+ try {
+ dagScheduler = new DAGScheduler(this)
+ } catch {
+ case e: Exception => throw
+ new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
+ }
- @volatile private[spark] var dagScheduler = new DAGScheduler(this)
- dagScheduler.start()
+ // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+ // constructor
+ taskScheduler.start()
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
@@ -1022,8 +1029,8 @@ class SparkContext(config: SparkConf) extends Logging {
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
- partitions.foreach{ p =>
- require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+ if (dagScheduler == null) {
+ throw new SparkException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
@@ -1132,9 +1139,6 @@ class SparkContext(config: SparkConf) extends Logging {
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
- partitions.foreach{ p =>
- require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
- }
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e8bbfbf016..3b3524f33e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1142,9 +1142,9 @@ abstract class RDD[T: ClassTag](
@transient private var doCheckpointCalled = false
/**
- * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
- * after a job using this RDD has completed (therefore the RDD has been materialized and
- * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+ * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
+ * has completed (therefore the RDD has been materialized and potentially stored in memory).
+ * doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dbde9b591d..ff411e24a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -22,10 +22,16 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.reflect.ClassTag
import akka.actor._
+import akka.actor.OneForOneStrategy
+import akka.actor.SupervisorStrategy.Stop
+import akka.pattern.ask
+import akka.util.Timeout
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
@@ -47,14 +53,11 @@ import org.apache.spark.util.Utils
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
- * THREADING: This class runs all its logic in a single thread executing the run() method, to which
- * events are submitted using a synchronized queue (eventQueue). The public API methods, such as
- * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
- * should be private.
*/
private[spark]
class DAGScheduler(
- taskScheduler: TaskScheduler,
+ private[scheduler] val sc: SparkContext,
+ private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
@@ -65,6 +68,7 @@ class DAGScheduler(
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
+ sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
@@ -74,8 +78,6 @@ class DAGScheduler(
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
- private var eventProcessActor: ActorRef = _
-
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
@@ -113,50 +115,31 @@ class DAGScheduler(
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
- taskScheduler.setDAGScheduler(this)
+ private val dagSchedulerActorSupervisor =
+ env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
- /**
- * Starts the event processing actor. The actor has two responsibilities:
- *
- * 1. Waits for events like job submission, task finished, task failure etc., and calls
- * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
- * 2. Schedules a periodical task to resubmit failed stages.
- *
- * NOTE: the actor cannot be started in the constructor, because the periodical task references
- * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
- * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
- */
- def start() {
- eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
- /**
- * The main event loop of the DAG scheduler.
- */
- def receive = {
- case event: DAGSchedulerEvent =>
- logTrace("Got event of type " + event.getClass.getName)
-
- /**
- * All events are forwarded to `processEvent()`, so that the event processing logic can
- * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
- * for details.
- */
- if (!processEvent(event)) {
- submitWaitingStages()
- } else {
- context.stop(self)
- }
- }
- }))
+ private[scheduler] var eventProcessActor: ActorRef = _
+
+ private def initializeEventProcessActor() {
+ // blocking the thread until supervisor is started, which ensures eventProcessActor is
+ // not null before any job is submitted
+ implicit val timeout = Timeout(30 seconds)
+ val initEventActorReply =
+ dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
+ eventProcessActor = Await.result(initEventActorReply, timeout.duration).
+ asInstanceOf[ActorRef]
}
+ initializeEventProcessActor()
+
// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor ! BeginEvent(task, taskInfo)
}
// Called to report that a task has completed and results are being fetched remotely.
- def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
- eventProcessActor ! GettingResultEvent(task, taskInfo)
+ def taskGettingResult(taskInfo: TaskInfo) {
+ eventProcessActor ! GettingResultEvent(taskInfo)
}
// Called by TaskScheduler to report task completions or failures.
@@ -436,7 +419,7 @@ class DAGScheduler(
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
- partitions.find(p => p >= maxPartitions).foreach { p =>
+ partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
@@ -511,6 +494,15 @@ class DAGScheduler(
eventProcessActor ! AllJobsCancelled
}
+ private[scheduler] def doCancelAllJobs() {
+ // Cancel all running jobs.
+ runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+ reason = "as part of cancellation of all jobs"))
+ activeJobs.clear() // These should already be empty by this point,
+ jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
+ submitWaitingStages()
+ }
+
/**
* Cancel all jobs associated with a running or scheduled stage.
*/
@@ -519,147 +511,29 @@ class DAGScheduler(
}
/**
- * Process one event retrieved from the event processing actor.
- *
- * @param event The event to be processed.
- * @return `true` if we should stop the event loop.
- */
- private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
- event match {
- case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
- var finalStage: Stage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
- // whose underlying HDFS files have been deleted.
- finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return false
- }
- val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
- clearCacheLocs()
- logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
- " output partitions (allowLocal=" + allowLocal + ")")
- logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
- if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
- // Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
- runLocally(job)
- } else {
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- resultStageToJob(finalStage) = job
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
- submitStage(finalStage)
- }
-
- case StageCancelled(stageId) =>
- handleStageCancellation(stageId)
-
- case JobCancelled(jobId) =>
- handleJobCancellation(jobId)
-
- case JobGroupCancelled(groupId) =>
- // Cancel all jobs belonging to this job group.
- // First finds all active jobs with this group id, and then kill stages for them.
- val activeInGroup = activeJobs.filter(activeJob =>
- groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
- val jobIds = activeInGroup.map(_.jobId)
- jobIds.foreach(jobId => handleJobCancellation(jobId,
- "as part of cancelled job group %s".format(groupId)))
-
- case AllJobsCancelled =>
- // Cancel all running jobs.
- runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
- "as part of cancellation of all jobs"))
- activeJobs.clear() // These should already be empty by this point,
- jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
-
- case ExecutorAdded(execId, host) =>
- handleExecutorAdded(execId, host)
-
- case ExecutorLost(execId) =>
- handleExecutorLost(execId)
-
- case BeginEvent(task, taskInfo) =>
- for (
- stage <- stageIdToStage.get(task.stageId);
- stageInfo <- stageToInfos.get(stage)
- ) {
- if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
- !stageInfo.emittedTaskSizeWarning) {
- stageInfo.emittedTaskSizeWarning = true
- logWarning(("Stage %d (%s) contains a task of very large " +
- "size (%d KB). The maximum recommended task size is %d KB.").format(
- task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
- }
- }
- listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
-
- case GettingResultEvent(task, taskInfo) =>
- listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
-
- case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
- val stageId = task.stageId
- val taskType = Utils.getFormattedClassName(task)
- listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
- handleTaskCompletion(completion)
-
- case TaskSetFailed(taskSet, reason) =>
- stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
-
- case ResubmitFailedStages =>
- if (failedStages.size > 0) {
- // Failed stages may be removed by job cancellation, so failed might be empty even if
- // the ResubmitFailedStages event has been scheduled.
- resubmitFailedStages()
- }
-
- case StopDAGScheduler =>
- // Cancel any active jobs
- for (job <- activeJobs) {
- val error = new SparkException("Job cancelled because SparkContext was shut down")
- job.listener.jobFailed(error)
- // Tell the listeners that all of the running stages have ended. Don't bother
- // cancelling the stages because if the DAG scheduler is stopped, the entire application
- // is in the process of getting stopped.
- val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
- runningStages.foreach { stage =>
- val info = stageToInfos(stage)
- info.stageFailed(stageFailedMessage)
- listenerBus.post(SparkListenerStageCompleted(info))
- }
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
- }
- return true
- }
- false
- }
-
- /**
* Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
* the last fetch failure.
*/
private[scheduler] def resubmitFailedStages() {
- logInfo("Resubmitting failed stages")
- clearCacheLocs()
- val failedStagesCopy = failedStages.toArray
- failedStages.clear()
- for (stage <- failedStagesCopy.sortBy(_.jobId)) {
- submitStage(stage)
+ if (failedStages.size > 0) {
+ // Failed stages may be removed by job cancellation, so failed might be empty even if
+ // the ResubmitFailedStages event has been scheduled.
+ logInfo("Resubmitting failed stages")
+ clearCacheLocs()
+ val failedStagesCopy = failedStages.toArray
+ failedStages.clear()
+ for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+ submitStage(stage)
+ }
}
+ submitWaitingStages()
}
/**
* Check for waiting or failed stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
*/
- private[scheduler] def submitWaitingStages() {
+ private def submitWaitingStages() {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
logTrace("Checking for newly runnable parent stages")
@@ -730,6 +604,102 @@ class DAGScheduler(
}
}
+ private[scheduler] def handleJobGroupCancelled(groupId: String) {
+ // Cancel all jobs belonging to this job group.
+ // First finds all active jobs with this group id, and then kill stages for them.
+ val activeInGroup = activeJobs.filter(activeJob =>
+ groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+ val jobIds = activeInGroup.map(_.jobId)
+ jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
+ submitWaitingStages()
+ }
+
+ private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
+ for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) {
+ if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
+ !stageInfo.emittedTaskSizeWarning) {
+ stageInfo.emittedTaskSizeWarning = true
+ logWarning(("Stage %d (%s) contains a task of very large " +
+ "size (%d KB). The maximum recommended task size is %d KB.").format(
+ task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
+ DAGScheduler.TASK_SIZE_TO_WARN))
+ }
+ }
+ listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+ submitWaitingStages()
+ }
+
+ private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) {
+ stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
+ submitWaitingStages()
+ }
+
+ private[scheduler] def cleanUpAfterSchedulerStop() {
+ for (job <- activeJobs) {
+ val error = new SparkException("Job cancelled because SparkContext was shut down")
+ job.listener.jobFailed(error)
+ // Tell the listeners that all of the running stages have ended. Don't bother
+ // cancelling the stages because if the DAG scheduler is stopped, the entire application
+ // is in the process of getting stopped.
+ val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
+ runningStages.foreach { stage =>
+ val info = stageToInfos(stage)
+ info.stageFailed(stageFailedMessage)
+ listenerBus.post(SparkListenerStageCompleted(info))
+ }
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+ }
+ }
+
+ private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
+ listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
+ submitWaitingStages()
+ }
+
+ private[scheduler] def handleJobSubmitted(jobId: Int,
+ finalRDD: RDD[_],
+ func: (TaskContext, Iterator[_]) => _,
+ partitions: Array[Int],
+ allowLocal: Boolean,
+ callSite: String,
+ listener: JobListener,
+ properties: Properties = null)
+ {
+ var finalStage: Stage = null
+ try {
+ // New stage creation may throw an exception if, for example, jobs are run on a
+ // HadoopRDD whose underlying HDFS files have been deleted.
+ finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
+ } catch {
+ case e: Exception =>
+ logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+ listener.jobFailed(e)
+ return
+ }
+ if (finalStage != null) {
+ val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
+ clearCacheLocs()
+ logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
+ job.jobId, callSite, partitions.length, allowLocal))
+ logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+ if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+ // Compute very short actions like first() or take() with no parent stages locally.
+ listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
+ runLocally(job)
+ } else {
+ jobIdToActiveJob(jobId) = job
+ activeJobs += job
+ resultStageToJob(finalStage) = job
+ listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
+ properties))
+ submitStage(finalStage)
+ }
+ }
+ submitWaitingStages()
+ }
+
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
@@ -819,9 +789,12 @@ class DAGScheduler(
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
*/
- private def handleTaskCompletion(event: CompletionEvent) {
+ private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
-
+ val stageId = task.stageId
+ val taskType = Utils.getFormattedClassName(task)
+ listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
+ event.taskMetrics))
if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
@@ -964,6 +937,7 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
+ submitWaitingStages()
}
/**
@@ -973,7 +947,7 @@ class DAGScheduler(
* Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+ private[scheduler] def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
@@ -993,17 +967,19 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
+ submitWaitingStages()
}
- private def handleExecutorAdded(execId: String, host: String) {
+ private[scheduler] def handleExecutorAdded(execId: String, host: String) {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
+ submitWaitingStages()
}
- private def handleStageCancellation(stageId: Int) {
+ private[scheduler] def handleStageCancellation(stageId: Int) {
if (stageIdToJobIds.contains(stageId)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
jobsThatUseStage.foreach(jobId => {
@@ -1012,22 +988,24 @@ class DAGScheduler(
} else {
logInfo("No active jobs to kill for Stage " + stageId)
}
+ submitWaitingStages()
}
- private def handleJobCancellation(jobId: Int, reason: String = "") {
+ private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
failJobAndIndependentStages(jobIdToActiveJob(jobId),
"Job %d cancelled %s".format(jobId, reason), None)
}
+ submitWaitingStages()
}
/**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
*/
- private def abortStage(failedStage: Stage, reason: String) {
+ private[scheduler] def abortStage(failedStage: Stage, reason: String) {
if (!stageIdToStage.contains(failedStage.id)) {
// Skip all the actions if the stage has been removed.
return
@@ -1156,13 +1134,88 @@ class DAGScheduler(
}
def stop() {
- if (eventProcessActor != null) {
- eventProcessActor ! StopDAGScheduler
- }
+ logInfo("Stopping DAGScheduler")
+ dagSchedulerActorSupervisor ! PoisonPill
taskScheduler.stop()
}
}
+private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
+ extends Actor with Logging {
+
+ override val supervisorStrategy =
+ OneForOneStrategy() {
+ case x: Exception =>
+ logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
+ .format(x.getMessage))
+ dagScheduler.doCancelAllJobs()
+ dagScheduler.sc.stop()
+ Stop
+ }
+
+ def receive = {
+ case p: Props => sender ! context.actorOf(p)
+ case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
+ }
+}
+
+private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
+ extends Actor with Logging {
+
+ override def preStart() {
+ // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
+ // valid when the messages arrive
+ dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
+ }
+
+ /**
+ * The main event loop of the DAG scheduler.
+ */
+ def receive = {
+ case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
+ dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
+ listener, properties)
+
+ case StageCancelled(stageId) =>
+ dagScheduler.handleStageCancellation(stageId)
+
+ case JobCancelled(jobId) =>
+ dagScheduler.handleJobCancellation(jobId)
+
+ case JobGroupCancelled(groupId) =>
+ dagScheduler.handleJobGroupCancelled(groupId)
+
+ case AllJobsCancelled =>
+ dagScheduler.doCancelAllJobs()
+
+ case ExecutorAdded(execId, host) =>
+ dagScheduler.handleExecutorAdded(execId, host)
+
+ case ExecutorLost(execId) =>
+ dagScheduler.handleExecutorLost(execId)
+
+ case BeginEvent(task, taskInfo) =>
+ dagScheduler.handleBeginEvent(task, taskInfo)
+
+ case GettingResultEvent(taskInfo) =>
+ dagScheduler.handleGetTaskResult(taskInfo)
+
+ case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
+ dagScheduler.handleTaskCompletion(completion)
+
+ case TaskSetFailed(taskSet, reason) =>
+ dagScheduler.handleTaskSetFailed(taskSet, reason)
+
+ case ResubmitFailedStages =>
+ dagScheduler.resubmitFailedStages()
+ }
+
+ override def postStop() {
+ // Cancel any active jobs in postStop hook
+ dagScheduler.cleanUpAfterSchedulerStop()
+ }
+}
+
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0800c5684c..23f57441b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -57,7 +57,7 @@ private[scheduler]
case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler]
-case class GettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+case class GettingResultEvent(taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler] case class CompletionEvent(
task: Task[_],
@@ -76,5 +76,3 @@ private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
-
-private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a81b834267..f3bd0797aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -465,7 +465,7 @@ private[spark] class TaskSetManager(
def handleTaskGettingResult(tid: Long) = {
val info = taskInfos(tid)
info.markGettingResult()
- sched.dagScheduler.taskGettingResult(tasks(info.index), info)
+ sched.dagScheduler.taskGettingResult(info)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ff69eb7e53..d172dd1ac8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -21,6 +21,8 @@ import scala.Tuple2
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.language.reflectiveCalls
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark._
@@ -28,19 +30,16 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
-/**
- * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
- * rather than spawning an event loop thread as happens in the real code. They use EasyMock
- * to mock out two classes that DAGScheduler interacts with: TaskScheduler (to which TaskSets are
- * submitted) and BlockManagerMaster (from which cache locations are retrieved and to which dead
- * host notifications are sent). In addition, tests may check for side effects on a non-mocked
- * MapOutputTracker instance.
- *
- * Tests primarily consist of running DAGScheduler#processEvent and
- * DAGScheduler#submitWaitingStages (via test utility functions like runEvent or respondToTaskSet)
- * and capturing the resulting TaskSets from the mock TaskScheduler.
- */
-class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
+class BuggyDAGEventProcessActor extends Actor {
+ val state = 0
+ def receive = {
+ case _ => throw new SparkException("error")
+ }
+}
+
+class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuite
+ with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
@@ -82,6 +81,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
var mapOutputTracker: MapOutputTrackerMaster = null
var scheduler: DAGScheduler = null
+ var dagEventProcessTestActor: TestActorRef[DAGSchedulerEventProcessActor] = null
/**
* Set of cache locations to return from our mock BlockManagerMaster.
@@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
results.clear()
mapOutputTracker = new MapOutputTrackerMaster(conf)
scheduler = new DAGScheduler(
+ sc,
taskScheduler,
sc.listenerBus,
mapOutputTracker,
@@ -131,10 +132,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runLocallyWithinThread(job)
}
}
+ dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+ Props(classOf[DAGSchedulerEventProcessActor], scheduler))(system)
}
- after {
- scheduler.stop()
+ override def afterAll() {
+ super.afterAll()
+ TestKit.shutdownActorSystem(system)
}
/**
@@ -178,8 +182,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
* DAGScheduler event loop.
*/
private def runEvent(event: DAGSchedulerEvent) {
- assert(!scheduler.processEvent(event))
- scheduler.submitWaitingStages()
+ dagEventProcessTestActor.receive(event)
}
/**
@@ -209,7 +212,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
- return jobId
+ jobId
}
/** Sends TaskSetFailed to the scheduler. */
@@ -223,19 +226,17 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
test("zero split job") {
- val rdd = makeRdd(0, Nil)
var numResults = 0
val fakeListener = new JobListener() {
override def taskSucceeded(partition: Int, value: Any) = numResults += 1
override def jobFailed(exception: Exception) = throw exception
}
- submit(rdd, Array(), listener = fakeListener)
+ submit(makeRdd(0, Nil), Array(), listener = fakeListener)
assert(numResults === 0)
}
test("run trivial job") {
- val rdd = makeRdd(1, Nil)
- submit(rdd, Array(0))
+ submit(makeRdd(1, Nil), Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
@@ -529,6 +530,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assertDataStructuresEmpty
}
+ test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
+ val actorSystem = ActorSystem("test")
+ val supervisor = actorSystem.actorOf(
+ Props(classOf[DAGSchedulerActorSupervisor], scheduler), "dagSupervisor")
+ supervisor ! Props[BuggyDAGEventProcessActor]
+ val child = expectMsgType[ActorRef]
+ watch(child)
+ child ! "hi"
+ expectMsgPF(){ case Terminated(child) => () }
+ assert(scheduler.sc.dagScheduler === null)
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
@@ -561,3 +574,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.waitingStages.isEmpty)
}
}
+
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2fb750d9ee..a8b605c5b2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -305,7 +305,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
-
+ taskScheduler.setDAGScheduler(dagScheduler)
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))