aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala63
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala1
3 files changed, 42 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c82ae4baa3..c912520fde 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -50,6 +50,10 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
+ * Here's a checklist to use when making or reviewing changes to this class:
+ *
+ * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
+ * include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
@@ -111,6 +115,8 @@ class DAGScheduler(
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
+ private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
+
// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
@@ -128,8 +134,6 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
- private val outputCommitCoordinator = env.outputCommitCoordinator
-
// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -710,9 +714,10 @@ class DAGScheduler(
// cancelling the stages because if the DAG scheduler is stopped, the entire application
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
- runningStages.foreach { stage =>
- stage.latestInfo.stageFailed(stageFailedMessage)
- listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+ // The `toArray` here is necessary so that we don't iterate over `runningStages` while
+ // mutating it.
+ runningStages.toArray.foreach { stage =>
+ markStageAsFinished(stage, Some(stageFailedMessage))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
@@ -887,10 +892,9 @@ class DAGScheduler(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
- // Because we posted SparkListenerStageSubmitted earlier, we should post
- // SparkListenerStageCompleted here in case there are no tasks to run.
- outputCommitCoordinator.stageEnd(stage.id)
- listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+ // Because we posted SparkListenerStageSubmitted earlier, we should mark
+ // the stage as completed here in case there are no tasks to run
+ markStageAsFinished(stage, None)
val debugString = stage match {
case stage: ShuffleMapStage =>
@@ -902,7 +906,6 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
- runningStages -= stage
}
}
@@ -968,22 +971,6 @@ class DAGScheduler(
}
val stage = stageIdToStage(task.stageId)
-
- def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
- val serviceTime = stage.latestInfo.submissionTime match {
- case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
- case _ => "Unknown"
- }
- if (errorMessage.isEmpty) {
- logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
- stage.latestInfo.completionTime = Some(clock.getTimeMillis())
- } else {
- stage.latestInfo.stageFailed(errorMessage.get)
- logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
- }
- listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
- runningStages -= stage
- }
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
@@ -1099,7 +1086,6 @@ class DAGScheduler(
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some(failureMessage))
- runningStages -= failedStage
}
if (disallowStageRetryForTest) {
@@ -1216,6 +1202,26 @@ class DAGScheduler(
}
/**
+ * Marks a stage as finished and removes it from the list of running stages.
+ */
+ private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
+ val serviceTime = stage.latestInfo.submissionTime match {
+ case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
+ case _ => "Unknown"
+ }
+ if (errorMessage.isEmpty) {
+ logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+ stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+ } else {
+ stage.latestInfo.stageFailed(errorMessage.get)
+ logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+ }
+ outputCommitCoordinator.stageEnd(stage.id)
+ listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+ runningStages -= stage
+ }
+
+ /**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
*/
@@ -1264,8 +1270,7 @@ class DAGScheduler(
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
- stage.latestInfo.stageFailed(failureReason)
- listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+ markStageAsFinished(stage, Some(failureReason))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 9e29fd1382..7c184b1dcb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -60,6 +60,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
/**
+ * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
+ */
+ def isEmpty: Boolean = {
+ authorizedCommittersByStage.isEmpty
+ }
+
+ /**
* Called by tasks to ask whether they can commit their output to HDFS.
*
* If a task attempt has been authorized to commit, then all other attempts to commit the same
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 63360a0f18..eb759f0807 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -783,6 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(scheduler.runningStages.isEmpty)
assert(scheduler.shuffleToMapStage.isEmpty)
assert(scheduler.waitingStages.isEmpty)
+ assert(scheduler.outputCommitCoordinator.isEmpty)
}
// Nothing in this test should break if the task info's fields are null, but