aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-17 18:58:48 -0700
committerReynold Xin <rxin@apache.org>2014-07-17 18:58:48 -0700
commit72e9021eaf26f31a82120505f8b764b18fbe8d48 (patch)
tree7fd67414ac728bc5b10217812aadab275f82c1a9 /core/src/test
parent935fe65ff6559a0e3b481e7508fa14337b23020b (diff)
downloadspark-72e9021eaf26f31a82120505f8b764b18fbe8d48.tar.gz
spark-72e9021eaf26f31a82120505f8b764b18fbe8d48.tar.bz2
spark-72e9021eaf26f31a82120505f8b764b18fbe8d48.zip
[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener
This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue. @andrewor14 Author: Reynold Xin <rxin@apache.org> Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits: 1ac3f97 [Reynold Xin] Oops. Properly handle description. f5736ad [Reynold Xin] Code review comments. b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables 7a7b6c4 [Reynold Xin] Revert css change. f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up. 63256f5 [Reynold Xin] [SPARK-2320] Reduce <pre> block font size.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala36
1 files changed, 19 insertions, 17 deletions
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index fa43b66c6c..a855662480 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
}
listener.completedStages.size should be (5)
- listener.completedStages.filter(_.stageId == 50).size should be (1)
- listener.completedStages.filter(_.stageId == 49).size should be (1)
- listener.completedStages.filter(_.stageId == 48).size should be (1)
- listener.completedStages.filter(_.stageId == 47).size should be (1)
- listener.completedStages.filter(_.stageId == 46).size should be (1)
+ listener.completedStages.count(_.stageId == 50) should be (1)
+ listener.completedStages.count(_.stageId == 49) should be (1)
+ listener.completedStages.count(_.stageId == 48) should be (1)
+ listener.completedStages.count(_.stageId == 47) should be (1)
+ listener.completedStages.count(_.stageId == 46) should be (1)
}
test("test executor id to summary") {
@@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
-
- // nothing in it
- assert(listener.stageIdToExecutorSummaries.size == 0)
+ assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
@@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
@@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.size == 1)
+ assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 2000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 2000)
// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
@@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
+ .shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
@@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
TaskKilled,
ExecutorLostFailure,
UnknownReason)
+ var failCount = 0
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ failCount += 1
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
}