aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
committerReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
commitd1636dd72fc4966413baeb97ba55b313dc1da63d (patch)
treecb20bcc9304a4a3324b71b3b7b9f95c8523a0179 /core/src/test
parentbf578deaf2493081ceeb78dfd7617def5699a06e (diff)
downloadspark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.gz
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.bz2
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.zip
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
New UI: ![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png) Author: Reynold Xin <rxin@apache.org> Closes #1236 from rxin/ui-task-attempt and squashes the following commits: 3b645dd [Reynold Xin] Expose attemptId in Stage. c0474b1 [Reynold Xin] Beefed up unit test. c404bdd [Reynold Xin] Fix ReplayListenerSuite. f56be4b [Reynold Xin] Fixed JsonProtocolSuite. e29e0f7 [Reynold Xin] Minor update. 5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala88
2 files changed, 63 insertions, 36 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index e0fec6a068..fa43b66c6c 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -66,7 +66,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
@@ -75,7 +75,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead == 1000)
// finish a task with unknown executor-id, nothing should happen
- taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo =
+ new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -84,7 +85,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -94,7 +95,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
- taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
@@ -106,7 +107,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = new TaskMetrics()
- val taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 495e1b7a0a..6c49870455 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,10 +35,11 @@ class JsonProtocolSuite extends FunSuite {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
- val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 444L))
- val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 3000L))
+ val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
+ val taskGettingResult =
+ SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 345L), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -73,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
test("Dependent Classes") {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
- testTaskInfo(makeTaskInfo(999L, 888, 777L))
+ testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
@@ -269,10 +270,12 @@ class JsonProtocolSuite extends FunSuite {
private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
assert(info1.taskId === info2.taskId)
assert(info1.index === info2.index)
+ assert(info1.attempt === info2.attempt)
assert(info1.launchTime === info2.launchTime)
assert(info1.executorId === info2.executorId)
assert(info1.host === info2.host)
assert(info1.taskLocality === info2.taskLocality)
+ assert(info1.speculative === info2.speculative)
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
@@ -453,8 +456,8 @@ class JsonProtocolSuite extends FunSuite {
new StageInfo(a, "greetings", b, rddInfos, "details")
}
- private def makeTaskInfo(a: Long, b: Int, c: Long) = {
- new TaskInfo(a, b, c, "executor", "your kind sir", TaskLocality.NODE_LOCAL)
+ private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
+ new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
@@ -510,37 +513,60 @@ class JsonProtocolSuite extends FunSuite {
private val taskStartJsonString =
"""
- {"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
- "Index":333,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
- "Serialized Size":0}}
- """
+ |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
+ |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
+ |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
+ |"Failed":false,"Serialized Size":0}}
+ """.stripMargin
private val taskGettingResultJsonString =
"""
- {"Event":"SparkListenerTaskGettingResult","Task Info":{"Task ID":1000,"Index":
- 2000,"Launch Time":3000,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":false,
- "Serialized Size":0}}
- """
+ |{"Event":"SparkListenerTaskGettingResult","Task Info":
+ | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
+ | "Finish Time":0,"Failed":false,"Serialized Size":0
+ | }
+ |}
+ """.stripMargin
private val taskEndJsonString =
"""
- {"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
- "Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":123,"Index":
- 234,"Launch Time":345,"Executor ID":"executor","Host":"your kind sir",
- "Locality":"NODE_LOCAL","Getting Result Time":0,"Finish Time":0,"Failed":
- false,"Serialized Size":0},"Task Metrics":{"Host Name":"localhost",
- "Executor Deserialize Time":300,"Executor Run Time":400,"Result Size":500,
- "JVM GC Time":600,"Result Serialization Time":700,"Memory Bytes Spilled":
- 800,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Shuffle Finish Time":
- 900,"Total Blocks Fetched":1500,"Remote Blocks Fetched":800,"Local Blocks Fetched":
- 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
- {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
- [{"Block ID":"rdd_0_0","Status":{"Storage Level":{"Use Disk":true,"Use Memory":true,
- "Use Tachyon":false,"Deserialized":false,"Replication":2},"Memory Size":0,"Tachyon Size":0,
- "Disk Size":0}}]}}
- """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Read Metrics":{
+ | "Shuffle Finish Time":900,
+ | "Total Blocks Fetched":1500,
+ | "Remote Blocks Fetched":800,
+ | "Local Blocks Fetched":700,
+ | "Fetch Wait Time":900,
+ | "Remote Bytes Read":1000
+ | },
+ | "Shuffle Write Metrics":{
+ | "Shuffle Bytes Written":1200,
+ | "Shuffle Write Time":1500},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]
+ | }
+ |}
+ """.stripMargin
private val jobStartJsonString =
"""