From d1636dd72fc4966413baeb97ba55b313dc1da63d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 26 Jun 2014 21:13:26 -0700 Subject: [SPARK-2297][UI] Make task attempt and speculation more explicit in UI. New UI: ![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png) Author: Reynold Xin Closes #1236 from rxin/ui-task-attempt and squashes the following commits: 3b645dd [Reynold Xin] Expose attemptId in Stage. c0474b1 [Reynold Xin] Beefed up unit test. c404bdd [Reynold Xin] Fix ReplayListenerSuite. f56be4b [Reynold Xin] Fixed JsonProtocolSuite. e29e0f7 [Reynold Xin] Minor update. 5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI. --- .../spark/ui/jobs/JobProgressListenerSuite.scala | 11 +-- .../org/apache/spark/util/JsonProtocolSuite.scala | 88 ++++++++++++++-------- 2 files changed, 63 insertions(+), 36 deletions(-) (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e0fec6a068..fa43b66c6c 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) @@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc .shuffleRead == 1000) // finish a task with unknown executor-id, nothing should happen - taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) + taskInfo = + new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) - taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL) + taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) @@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val conf = new SparkConf() val listener = new JobProgressListener(conf) val metrics = new TaskMetrics() - val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 val task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 495e1b7a0a..6c49870455 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite { val stageSubmitted = SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L)) - val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L)) - val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L)) + val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false)) + val taskGettingResult = + SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite { test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) - testTaskInfo(makeTaskInfo(999L, 888, 777L)) + testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) @@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite { private def assertEquals(info1: TaskInfo, info2: TaskInfo) { assert(info1.taskId === info2.taskId) assert(info1.index === info2.index) + assert(info1.attempt === info2.attempt) assert(info1.launchTime === info2.launchTime) assert(info1.executorId === info2.executorId) assert(info1.host === info2.host) assert(info1.taskLocality === info2.taskLocality) + assert(info1.speculative === info2.speculative) assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) @@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite { new StageInfo(a, "greetings", b, rddInfos, "details") } - private def makeTaskInfo(a: Long, b: Int, c: Long) = { - new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL) + private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { + new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { @@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite { private val taskStartJsonString = """ - {"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, - "Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false, - "Serialized Size":0}} - """ + |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, + |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", + |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, + |"Failed":false,"Serialized Size":0}} + """.stripMargin private val taskGettingResultJsonString = """ - {"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index": - 2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false, - "Serialized Size":0}} - """ + |{"Event":"SparkListenerTaskGettingResult","Task Info": + | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, + | "Finish Time":0,"Failed":false,"Serialized Size":0 + | } + |} + """.stripMargin private val taskEndJsonString = """ - {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index": - 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir", - "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed": - false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost", - "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500, - "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled": - 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time": - 900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched": - 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": - {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": - [{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true, - "Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0, - "Disk Size":0}}]}} - """ + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Read Metrics":{ + | "Shuffle Finish Time":900, + | "Total Blocks Fetched":1500, + | "Remote Blocks Fetched":800, + | "Local Blocks Fetched":700, + | "Fetch Wait Time":900, + | "Remote Bytes Read":1000 + | }, + | "Shuffle Write Metrics":{ + | "Shuffle Bytes Written":1200, + | "Shuffle Write Time":1500}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ] + | } + |} + """.stripMargin private val jobStartJsonString = """ -- cgit v1.2.3