aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
committerReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
commitd1636dd72fc4966413baeb97ba55b313dc1da63d (patch)
treecb20bcc9304a4a3324b71b3b7b9f95c8523a0179 /core
parentbf578deaf2493081ceeb78dfd7617def5699a06e (diff)
downloadspark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.gz
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.bz2
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.zip
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
New UI: ![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png) Author: Reynold Xin <rxin@apache.org> Closes #1236 from rxin/ui-task-attempt and squashes the following commits: 3b645dd [Reynold Xin] Expose attemptId in Stage. c0474b1 [Reynold Xin] Beefed up unit test. c404bdd [Reynold Xin] Fix ReplayListenerSuite. f56be4b [Reynold Xin] Fixed JsonProtocolSuite. e29e0f7 [Reynold Xin] Minor update. 5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala88
8 files changed, 102 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 378cf1aaeb..82163eadd5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 9a4be43ee2..8ec482a6f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -106,6 +106,8 @@ private[spark] class Stage(
id
}
+ def attemptId: Int = nextAttemptId
+
val name = callSite.short
val details = callSite.long
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 4c62e4dc0b..6aecdfe8e6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
+ val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
- val taskLocality: TaskLocality.TaskLocality) {
+ val taskLocality: TaskLocality.TaskLocality,
+ val speculative: Boolean) {
/**
* The time when the task started remotely getting the result. Will not be set if the
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c0898f64fb..83ff6b8550 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
+ *
+ * @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
- : Option[(Int, TaskLocality.Value)] =
+ : Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
- return Some((index, TaskLocality.PROCESS_LOCAL))
+ return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
- return Some((index, TaskLocality.NODE_LOCAL))
+ return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
@@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
- return Some((index, TaskLocality.RACK_LOCAL))
+ return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
- return Some((index, TaskLocality.PROCESS_LOCAL))
+ return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
- return Some((index, TaskLocality.ANY))
+ return Some((index, TaskLocality.ANY, false))
}
}
// Finally, if all else has failed, find a speculative task
- findSpeculativeTask(execId, host, locality)
+ findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
+ (taskIndex, allowedLocality, true)
+ }
}
/**
@@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
}
findTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality)) => {
+ case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
@@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
- val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
+ val attemptNum = taskAttempts(index).size
+ val info = new TaskInfo(
+ taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8b65f0671b..8e3d5d1cd4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</div>
// scalastyle:on
val taskHeaders: Seq[String] =
- Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
- Seq("Duration", "GC Time", "Result Ser Time") ++
+ Seq(
+ "Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
+ "Launch Time", "Duration", "GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
@@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
+ <td sorttable_customkey={info.attempt.toString}>{
+ if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
+ }</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
@@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
+ <!--
+ TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
+ -->
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 7cecbfe62a..6245b4b802 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -32,6 +32,8 @@ import org.apache.spark.storage._
import org.apache.spark._
private[spark] object JsonProtocol {
+ // TODO: Remove this file and put JSON serialization into each individual class.
+
private implicit val format = DefaultFormats
/** ------------------------------------------------- *
@@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
+ ("Attempt" -> taskInfo.attempt) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
("Locality" -> taskInfo.taskLocality.toString) ~
+ ("Speculative" -> taskInfo.speculative) ~
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
@@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
+ val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
+ val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val serializedSize = (json \ "Serialized Size").extract[Int]
- val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
+ val taskInfo =
+ new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e0fec6a068..fa43b66c6c 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
@@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
- taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo =
+ new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = new TaskMetrics()
- val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 495e1b7a0a..6c49870455 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
- val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
- val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L))
+ val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
+ val taskGettingResult =
+ SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
test("Dependent Classes") {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
- testTaskInfo(makeTaskInfo(999L, 888, 777L))
+ testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
@@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
+ assert(info1.attempt === info2.attempt)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
assert(info1.taskLocality === info2.taskLocality)
+ assert(info1.speculative === info2.speculative)
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
@@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
new StageInfo(a, "greetings", b, rddInfos, "details")
}
- private def makeTaskInfo(a: Long, b: Int, c: Long) = {
- new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
+ private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
+ new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
@@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {
private val taskStartJsonString =
"""
- {"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
- "Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
- "Serialized Size":0}}
- """
+ |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
+ |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
+ |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
+ |"Failed":false,"Serialized Size":0}}
+ """.stripMargin
private val taskGettingResultJsonString =
"""
- {"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index":
- 2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
- "Serialized Size":0}}
- """
+ |{"Event":"SparkListenerTaskGettingResult","Task Info":
+ | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
+ | "Finish Time":0,"Failed":false,"Serialized Size":0
+ | }
+ |}
+ """.stripMargin
private val taskEndJsonString =
"""
- {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
- "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
- 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
- false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
- "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
- "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
- 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
- 900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched":
- 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
- {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
- [{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true,
- "Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0,
- "Disk Size":0}}]}}
- """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Read Metrics":{
+ | "Shuffle Finish Time":900,
+ | "Total Blocks Fetched":1500,
+ | "Remote Blocks Fetched":800,
+ | "Local Blocks Fetched":700,
+ | "Fetch Wait Time":900,
+ | "Remote Bytes Read":1000
+ | },
+ | "Shuffle Write Metrics":{
+ | "Shuffle Bytes Written":1200,
+ | "Shuffle Write Time":1500},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]
+ | }
+ |}
+ """.stripMargin
private val jobStartJsonString =
"""