aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorThomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>2016-03-14 12:31:46 -0700
committerAndrew Or <andrew@databricks.com>2016-03-14 12:31:46 -0700
commit23385e853e7ca54332c6098cf83da7d0723546fe (patch)
tree35df872b40d6cf33114b9f91e1cba33277548730 /core
parente06493cb7b790623a9106241a8d496ecea703328 (diff)
downloadspark-23385e853e7ca54332c6098cf83da7d0723546fe.tar.gz
spark-23385e853e7ca54332c6098cf83da7d0723546fe.tar.bz2
spark-23385e853e7ca54332c6098cf83da7d0723546fe.zip
[SPARK-13054] Always post TaskEnd event for tasks
I am using dynamic container allocation and speculation and am seeing issues with the active task accounting. The Executor UI still shows active tasks on the an executor but the job/stage is all completed. I think its also affecting the dynamic allocation being able to release containers because it thinks there are still tasks. There are multiple issues with this: - If the task end for tasks (in this case probably because of speculation) comes in after the stage is finished, then the DAGScheduler.handleTaskCompletion will skip the task completion event Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com> Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com> Author: Tom Graves <tgraves@yahoo-inc.com> Closes #10951 from tgravescs/SPARK-11701.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala58
2 files changed, 65 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b576d4c5f3..8a36af27bd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1148,13 +1148,13 @@ class DAGScheduler(
null
}
- // The success case is dealt with separately below.
- // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
- if (event.reason != Success) {
- val attemptId = task.stageAttemptId
- listenerBus.post(SparkListenerTaskEnd(
- stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
- }
+ // The stage may have already finished when we get this event -- eg. maybe it was a
+ // speculative task. It is important that we send the TaskEnd event in any case, so listeners
+ // are properly notified and can chose to handle it. For instance, some listeners are
+ // doing their own accounting and if they don't get the task end event they think
+ // tasks are still running when they really aren't.
+ listenerBus.post(SparkListenerTaskEnd(
+ stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
@@ -1164,8 +1164,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
- listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
- event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d1c7143abf..55f4190680 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
+ val endedTasks = new HashSet[Long]
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
@@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
failedStages += stageInfo.stageId
}
}
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ endedTasks += taskEnd.taskInfo.taskId
+ }
}
var mapOutputTracker: MapOutputTrackerMaster = null
@@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
+ sparkListener.endedTasks.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
@@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}
+ test("task events always posted in speculation / when stage is killed") {
+ val baseRdd = new MyRDD(sc, 4, Nil)
+ val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
+ submit(finalRdd, Array(0, 1, 2, 3))
+
+ // complete two tasks
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(0), Success, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(1), Success, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ // verify stage exists
+ assert(scheduler.stageIdToStage.contains(0))
+ assert(sparkListener.endedTasks.size == 2)
+
+ // finish other 2 tasks
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(2), Success, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(3), Success, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sparkListener.endedTasks.size == 4)
+
+ // verify the stage is done
+ assert(!scheduler.stageIdToStage.contains(0))
+
+ // Stage should be complete. Finish one other Successful task to simulate what can happen
+ // with a speculative task and make sure the event is sent out
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(3), Success, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sparkListener.endedTasks.size == 5)
+
+ // make sure non successful tasks also send out event
+ runEvent(makeCompletionEvent(
+ taskSets(0).tasks(3), UnknownReason, 42,
+ Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+ sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+ assert(sparkListener.endedTasks.size == 6)
+ }
+
test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
@@ -1944,6 +1996,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
info
}
+ private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
+ val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
+ info.finishTime = 1 // to prevent spurious errors in JobProgressListener
+ info
+ }
+
private def makeCompletionEvent(
task: Task[_],
reason: TaskEndReason,