diff options
Diffstat (limited to 'core/src')
17 files changed, 117 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 5987cfea2e..732c89c39f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -88,10 +88,10 @@ private[spark] class Pool( schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason)) } - override def checkSpeculatableTasks(): Boolean = { + override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { - shouldRevive |= schedulable.checkSpeculatableTasks() + shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation) } shouldRevive } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index ab00bc8f0b..b6f88ed0a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -43,6 +43,6 @@ private[spark] trait Schedulable { def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit - def checkSpeculatableTasks(): Boolean + def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index a42990addb..2d89232ba2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler import scala.collection.mutable.ListBuffer +import org.apache.spark.TaskState +import org.apache.spark.TaskState.TaskState import org.apache.spark.annotation.DeveloperApi /** @@ -58,24 +60,26 @@ class TaskInfo( var failed = false + var killed = false + private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { gettingResultTime = time } - private[spark] def markSuccessful(time: Long = System.currentTimeMillis) { + private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { finishTime = time - } - - private[spark] def markFailed(time: Long = System.currentTimeMillis) { - finishTime = time - failed = true + if (state == TaskState.FAILED) { + failed = true + } else if (state == TaskState.KILLED) { + killed = true + } } def gettingResult: Boolean = gettingResultTime != 0 def finished: Boolean = finishTime != 0 - def successful: Boolean = finished && !failed + def successful: Boolean = finished && !failed && !killed def running: Boolean = !finished @@ -88,6 +92,8 @@ class TaskInfo( } } else if (failed) { "FAILED" + } else if (killed) { + "KILLED" } else if (successful) { "SUCCESS" } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 01e85ca405..5cb1af9db0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -65,6 +65,11 @@ private[spark] class TaskSchedulerImpl( // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") + // Duplicate copies of a task will only be launched if the original copy has been running for + // at least this amount of time. This is to avoid the overhead of launching speculative copies + // of tasks that are very short. + val MIN_TIME_TO_SPECULATION = 100 + private val speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") @@ -463,7 +468,7 @@ private[spark] class TaskSchedulerImpl( def checkSpeculatableTasks() { var shouldRevive = false synchronized { - shouldRevive = rootPool.checkSpeculatableTasks() + shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) } if (shouldRevive) { backend.reviveOffers() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 08d33f688a..2eedd201ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -608,7 +608,7 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index - info.markSuccessful() + info.markFinished(TaskState.FINISHED) removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not @@ -617,6 +617,14 @@ private[spark] class TaskSetManager( // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) + // Kill any other attempts for the same task (since those are unnecessary now that one + // attempt completed successfully). + for (attemptInfo <- taskAttempts(index) if attemptInfo.running) { + logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") + sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + } if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( @@ -640,11 +648,11 @@ private[spark] class TaskSetManager( */ def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) { val info = taskInfos(tid) - if (info.failed) { + if (info.failed || info.killed) { return } removeRunningTask(tid) - info.markFailed() + info.markFinished(state) val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty @@ -821,7 +829,7 @@ private[spark] class TaskSetManager( * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that * we don't scan the whole task set. It might also help to make this sorted by launch time. */ - override def checkSpeculatableTasks(): Boolean = { + override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. if (isZombie || numTasks == 1) { @@ -835,7 +843,7 @@ private[spark] class TaskSetManager( val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray Arrays.sort(durations) val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) - val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100) + val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 1aa85d60ea..4e2fe5e0e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -337,6 +337,7 @@ private[spark] object UIUtils extends Logging { completed: Int, failed: Int, skipped: Int, + killed: Int, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) // started + completed can be > total when there are speculative tasks @@ -348,6 +349,7 @@ private[spark] object UIUtils extends Logging { {completed}/{total} { if (failed > 0) s"($failed failed)" } { if (skipped > 0) s"($skipped skipped)" } + { if (killed > 0) s"($killed killed)" } </span> <div class="bar bar-completed" style={completeWidth}></div> <div class="bar bar-running" style={startWidth}></div> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index 373c26be4c..035d70601c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -256,7 +256,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { </td> <td class="progress-cell"> {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, - failed = job.numFailedTasks, skipped = job.numSkippedTasks, + failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks, total = job.numTasks - job.numSkippedTasks)} </td> </tr> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index f609fb4cd2..293f1438b8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -57,6 +57,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage <th>Task Time</th> <th>Total Tasks</th> <th>Failed Tasks</th> + <th>Killed Tasks</th> <th>Succeeded Tasks</th> {if (hasInput) { <th> @@ -116,8 +117,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage <td>{k}</td> <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td> <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td> - <td>{v.failedTasks + v.succeededTasks}</td> + <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td> <td>{v.failedTasks}</td> + <td>{v.killedTasks}</td> <td>{v.succeededTasks}</td> {if (stageData.hasInput) { <td sorttable_customkey={v.inputBytes.toString}> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 842f42b4c9..c8827403fc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -369,6 +369,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskEnd.reason match { case Success => execSummary.succeededTasks += 1 + case TaskKilled => + execSummary.killedTasks += 1 case _ => execSummary.failedTasks += 1 } @@ -381,6 +383,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 None + case TaskKilled => + stageData.numKilledTasks += 1 + Some(TaskKilled.toErrorString) case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 Some(e.toErrorString) @@ -409,6 +414,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskEnd.reason match { case Success => jobData.numCompletedTasks += 1 + case TaskKilled => + jobData.numKilledTasks += 1 case _ => jobData.numFailedTasks += 1 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 2a1c3c1a50..0e020155a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -195,7 +195,7 @@ private[ui] class StageTableBase( <td class="progress-cell"> {UIUtils.makeProgressBar(started = stageData.numActiveTasks, completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, total = s.numTasks)} + skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)} </td> <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td> <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index d76a0e657c..20dde7cec8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -33,6 +33,7 @@ private[spark] object UIData { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 + var killedTasks : Int = 0 var inputBytes : Long = 0 var inputRecords : Long = 0 var outputBytes : Long = 0 @@ -63,6 +64,7 @@ private[spark] object UIData { var numCompletedTasks: Int = 0, var numSkippedTasks: Int = 0, var numFailedTasks: Int = 0, + var numKilledTasks: Int = 0, /* Stages */ var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: @@ -76,6 +78,7 @@ private[spark] object UIData { var numCompleteTasks: Int = _ var completedIndices = new OpenHashSet[Int]() var numFailedTasks: Int = _ + var numKilledTasks: Int = _ var executorRunTime: Long = _ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 18547d459e..022b226894 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -280,6 +280,7 @@ private[spark] object JsonProtocol { ("Getting Result Time" -> taskInfo.gettingResultTime) ~ ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ + ("Killed" -> taskInfo.killed) ~ ("Accumulables" -> JArray(taskInfo.accumulables.map(accumulableInfoToJson).toList)) } @@ -697,6 +698,7 @@ private[spark] object JsonProtocol { val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] + val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false) val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match { case Some(values) => values.map(accumulableInfoFromJson) case None => Seq[AccumulableInfo]() @@ -707,6 +709,7 @@ private[spark] object JsonProtocol { taskInfo.gettingResultTime = gettingResultTime taskInfo.finishTime = finishTime taskInfo.failed = failed + taskInfo.killed = killed accumulables.foreach { taskInfo.accumulables += _ } taskInfo } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9b7b945bf3..1d7c8f4a61 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -22,6 +22,8 @@ import java.util.Random import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.mockito.Mockito.{mock, verify} + import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{AccumulatorV2, ManualClock} @@ -789,6 +791,54 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) } + test("Kill other task attempts when one attempt belonging to the same task succeeds") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start + for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + // Complete the 3 tasks and leave 1 task in running + for (id <- Set(0, 1, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + assert(manager.checkSpeculatableTasks(0)) + // Offer resource to start the speculative attempt for the running task + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption5.isDefined) + val task5 = taskOption5.get + assert(task5.index === 3) + assert(task5.taskId === 4) + assert(task5.executorId === "exec1") + assert(task5.attemptNumber === 1) + sched.backend = mock(classOf[SchedulerBackend]) + // Complete the speculative attempt for the running task + manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) + // Verify that it kills other running attempt + verify(sched.backend).killTask(3, "exec2", true) + // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be + // killed, so the FakeTaskScheduler is only told about the successful completion + // of the speculated task. + assert(sched.endedTasks(3) === Success) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index b83ffa3282..6d726d3d59 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -83,7 +83,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) - taskInfo.markSuccessful() + taskInfo.markFinished(TaskState.FINISHED) val taskMetrics = TaskMetrics.empty taskMetrics.incPeakExecutionMemory(peakExecutionMemory) jobListener.onTaskEnd( diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala index 58beaf103c..6335d905c0 100644 --- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala @@ -110,7 +110,7 @@ class UIUtilsSuite extends SparkFunSuite { } test("SPARK-11906: Progress bar should not overflow because of speculative tasks") { - val generated = makeProgressBar(2, 3, 0, 0, 4).head.child.filter(_.label == "div") + val generated = makeProgressBar(2, 3, 0, 0, 0, 4).head.child.filter(_.label == "div") val expected = Seq( <div class="bar bar-completed" style="width: 75.0%"></div>, <div class="bar bar-running" style="width: 25.0%"></div> diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 1fa9b28edf..edab727fc4 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -243,7 +243,6 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with new FetchFailed(null, 0, 0, 0, "ignored"), ExceptionFailure("Exception", "description", null, null, None), TaskResultLost, - TaskKilled, ExecutorLostFailure("0", true, Some("Induced failure")), UnknownReason) var failCount = 0 @@ -255,6 +254,11 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) } + // Make sure killed tasks are accounted for correctly. + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, TaskKilled, taskInfo, metrics)) + assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === 1) + // Make sure we count success as success. listener.onTaskEnd( SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6fda7378e6..0a8bbba6c5 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -966,6 +966,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Getting Result Time": 0, | "Finish Time": 0, | "Failed": false, + | "Killed": false, | "Accumulables": [ | { | "ID": 1, @@ -1012,6 +1013,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Getting Result Time": 0, | "Finish Time": 0, | "Failed": false, + | "Killed": false, | "Accumulables": [ | { | "ID": 1, @@ -1064,6 +1066,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Getting Result Time": 0, | "Finish Time": 0, | "Failed": false, + | "Killed": false, | "Accumulables": [ | { | "ID": 1, @@ -1161,6 +1164,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Getting Result Time": 0, | "Finish Time": 0, | "Failed": false, + | "Killed": false, | "Accumulables": [ | { | "ID": 1, @@ -1258,6 +1262,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Getting Result Time": 0, | "Finish Time": 0, | "Failed": false, + | "Killed": false, | "Accumulables": [ | { | "ID": 1, |