aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala5
17 files changed, 117 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5987cfea2e..732c89c39f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -88,10 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}
- override def checkSpeculatableTasks(): Boolean = {
+ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
- shouldRevive |= schedulable.checkSpeculatableTasks()
+ shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)
}
shouldRevive
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index ab00bc8f0b..b6f88ed0a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -43,6 +43,6 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
- def checkSpeculatableTasks(): Boolean
+ def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index a42990addb..2d89232ba2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
import scala.collection.mutable.ListBuffer
+import org.apache.spark.TaskState
+import org.apache.spark.TaskState.TaskState
import org.apache.spark.annotation.DeveloperApi
/**
@@ -58,24 +60,26 @@ class TaskInfo(
var failed = false
+ var killed = false
+
private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
gettingResultTime = time
}
- private[spark] def markSuccessful(time: Long = System.currentTimeMillis) {
+ private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) {
finishTime = time
- }
-
- private[spark] def markFailed(time: Long = System.currentTimeMillis) {
- finishTime = time
- failed = true
+ if (state == TaskState.FAILED) {
+ failed = true
+ } else if (state == TaskState.KILLED) {
+ killed = true
+ }
}
def gettingResult: Boolean = gettingResultTime != 0
def finished: Boolean = finishTime != 0
- def successful: Boolean = finished && !failed
+ def successful: Boolean = finished && !failed && !killed
def running: Boolean = !finished
@@ -88,6 +92,8 @@ class TaskInfo(
}
} else if (failed) {
"FAILED"
+ } else if (killed) {
+ "KILLED"
} else if (successful) {
"SUCCESS"
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 01e85ca405..5cb1af9db0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -65,6 +65,11 @@ private[spark] class TaskSchedulerImpl(
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
+ // Duplicate copies of a task will only be launched if the original copy has been running for
+ // at least this amount of time. This is to avoid the overhead of launching speculative copies
+ // of tasks that are very short.
+ val MIN_TIME_TO_SPECULATION = 100
+
private val speculationScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
@@ -463,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
def checkSpeculatableTasks() {
var shouldRevive = false
synchronized {
- shouldRevive = rootPool.checkSpeculatableTasks()
+ shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
}
if (shouldRevive) {
backend.reviveOffers()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 08d33f688a..2eedd201ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -608,7 +608,7 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
- info.markSuccessful()
+ info.markFinished(TaskState.FINISHED)
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@@ -617,6 +617,14 @@ private[spark] class TaskSetManager(
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
+ // Kill any other attempts for the same task (since those are unnecessary now that one
+ // attempt completed successfully).
+ for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
+ logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
+ s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
+ s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+ sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)
+ }
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
@@ -640,11 +648,11 @@ private[spark] class TaskSetManager(
*/
def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
val info = taskInfos(tid)
- if (info.failed) {
+ if (info.failed || info.killed) {
return
}
removeRunningTask(tid)
- info.markFailed()
+ info.markFinished(state)
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
@@ -821,7 +829,7 @@ private[spark] class TaskSetManager(
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
* we don't scan the whole task set. It might also help to make this sorted by launch time.
*/
- override def checkSpeculatableTasks(): Boolean = {
+ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// Can't speculate if we only have one task, and no need to speculate if the task set is a
// zombie.
if (isZombie || numTasks == 1) {
@@ -835,7 +843,7 @@ private[spark] class TaskSetManager(
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1))
- val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
+ val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 1aa85d60ea..4e2fe5e0e5 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -337,6 +337,7 @@ private[spark] object UIUtils extends Logging {
completed: Int,
failed: Int,
skipped: Int,
+ killed: Int,
total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
// started + completed can be > total when there are speculative tasks
@@ -348,6 +349,7 @@ private[spark] object UIUtils extends Logging {
{completed}/{total}
{ if (failed > 0) s"($failed failed)" }
{ if (skipped > 0) s"($skipped skipped)" }
+ { if (killed > 0) s"($killed killed)" }
</span>
<div class="bar bar-completed" style={completeWidth}></div>
<div class="bar bar-running" style={startWidth}></div>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 373c26be4c..035d70601c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -256,7 +256,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
- failed = job.numFailedTasks, skipped = job.numSkippedTasks,
+ failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
</tr>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index f609fb4cd2..293f1438b8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -57,6 +57,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<th>Task Time</th>
<th>Total Tasks</th>
<th>Failed Tasks</th>
+ <th>Killed Tasks</th>
<th>Succeeded Tasks</th>
{if (hasInput) {
<th>
@@ -116,8 +117,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
- <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td>
<td>{v.failedTasks}</td>
+ <td>{v.killedTasks}</td>
<td>{v.succeededTasks}</td>
{if (stageData.hasInput) {
<td sorttable_customkey={v.inputBytes.toString}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 842f42b4c9..c8827403fc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -369,6 +369,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
taskEnd.reason match {
case Success =>
execSummary.succeededTasks += 1
+ case TaskKilled =>
+ execSummary.killedTasks += 1
case _ =>
execSummary.failedTasks += 1
}
@@ -381,6 +383,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.completedIndices.add(info.index)
stageData.numCompleteTasks += 1
None
+ case TaskKilled =>
+ stageData.numKilledTasks += 1
+ Some(TaskKilled.toErrorString)
case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
stageData.numFailedTasks += 1
Some(e.toErrorString)
@@ -409,6 +414,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
taskEnd.reason match {
case Success =>
jobData.numCompletedTasks += 1
+ case TaskKilled =>
+ jobData.numKilledTasks += 1
case _ =>
jobData.numFailedTasks += 1
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 2a1c3c1a50..0e020155a6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -195,7 +195,7 @@ private[ui] class StageTableBase(
<td class="progress-cell">
{UIUtils.makeProgressBar(started = stageData.numActiveTasks,
completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
- skipped = 0, total = s.numTasks)}
+ skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)}
</td>
<td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
<td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index d76a0e657c..20dde7cec8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -33,6 +33,7 @@ private[spark] object UIData {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
+ var killedTasks : Int = 0
var inputBytes : Long = 0
var inputRecords : Long = 0
var outputBytes : Long = 0
@@ -63,6 +64,7 @@ private[spark] object UIData {
var numCompletedTasks: Int = 0,
var numSkippedTasks: Int = 0,
var numFailedTasks: Int = 0,
+ var numKilledTasks: Int = 0,
/* Stages */
var numActiveStages: Int = 0,
// This needs to be a set instead of a simple count to prevent double-counting of rerun stages:
@@ -76,6 +78,7 @@ private[spark] object UIData {
var numCompleteTasks: Int = _
var completedIndices = new OpenHashSet[Int]()
var numFailedTasks: Int = _
+ var numKilledTasks: Int = _
var executorRunTime: Long = _
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 18547d459e..022b226894 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -280,6 +280,7 @@ private[spark] object JsonProtocol {
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
+ ("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList))
}
@@ -697,6 +698,7 @@ private[spark] object JsonProtocol {
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
+ val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
@@ -707,6 +709,7 @@ private[spark] object JsonProtocol {
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed
+ taskInfo.killed = killed
accumulables.foreach { taskInfo.accumulables += _ }
taskInfo
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 9b7b945bf3..1d7c8f4a61 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -22,6 +22,8 @@ import java.util.Random
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.mockito.Mockito.{mock, verify}
+
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{AccumulatorV2, ManualClock}
@@ -789,6 +791,54 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}
+ test("Kill other task attempts when one attempt belonging to the same task succeeds") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = FakeTask.createTaskSet(4)
+ // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+ sc.conf.set("spark.speculation.multiplier", "0.0")
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+ // Offer resources for 4 tasks to start
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(k, v, NO_PREF)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === k)
+ }
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+ // Complete the 3 tasks and leave 1 task in running
+ for (id <- Set(0, 1, 2)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ assert(manager.checkSpeculatableTasks(0))
+ // Offer resource to start the speculative attempt for the running task
+ val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+ assert(taskOption5.isDefined)
+ val task5 = taskOption5.get
+ assert(task5.index === 3)
+ assert(task5.taskId === 4)
+ assert(task5.executorId === "exec1")
+ assert(task5.attemptNumber === 1)
+ sched.backend = mock(classOf[SchedulerBackend])
+ // Complete the speculative attempt for the running task
+ manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3)))
+ // Verify that it kills other running attempt
+ verify(sched.backend).killTask(3, "exec2", true)
+ // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
+ // killed, so the FakeTaskScheduler is only told about the successful completion
+ // of the speculated task.
+ assert(sched.endedTasks(3) === Success)
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index b83ffa3282..6d726d3d59 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -83,7 +83,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
- taskInfo.markSuccessful()
+ taskInfo.markFinished(TaskState.FINISHED)
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
index 58beaf103c..6335d905c0 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -110,7 +110,7 @@ class UIUtilsSuite extends SparkFunSuite {
}
test("SPARK-11906: Progress bar should not overflow because of speculative tasks") {
- val generated = makeProgressBar(2, 3, 0, 0, 4).head.child.filter(_.label == "div")
+ val generated = makeProgressBar(2, 3, 0, 0, 0, 4).head.child.filter(_.label == "div")
val expected = Seq(
<div class="bar bar-completed" style="width: 75.0%"></div>,
<div class="bar bar-running" style="width: 25.0%"></div>
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 1fa9b28edf..edab727fc4 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -243,7 +243,6 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
new FetchFailed(null, 0, 0, 0, "ignored"),
ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
- TaskKilled,
ExecutorLostFailure("0", true, Some("Induced failure")),
UnknownReason)
var failCount = 0
@@ -255,6 +254,11 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
}
+ // Make sure killed tasks are accounted for correctly.
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, TaskKilled, taskInfo, metrics))
+ assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === 1)
+
// Make sure we count success as success.
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6fda7378e6..0a8bbba6c5 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -966,6 +966,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Getting Result Time": 0,
| "Finish Time": 0,
| "Failed": false,
+ | "Killed": false,
| "Accumulables": [
| {
| "ID": 1,
@@ -1012,6 +1013,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Getting Result Time": 0,
| "Finish Time": 0,
| "Failed": false,
+ | "Killed": false,
| "Accumulables": [
| {
| "ID": 1,
@@ -1064,6 +1066,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Getting Result Time": 0,
| "Finish Time": 0,
| "Failed": false,
+ | "Killed": false,
| "Accumulables": [
| {
| "ID": 1,
@@ -1161,6 +1164,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Getting Result Time": 0,
| "Finish Time": 0,
| "Failed": false,
+ | "Killed": false,
| "Accumulables": [
| {
| "ID": 1,
@@ -1258,6 +1262,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Getting Result Time": 0,
| "Finish Time": 0,
| "Failed": false,
+ | "Killed": false,
| "Accumulables": [
| {
| "ID": 1,