aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorYuming Wang <wgyumg@gmail.com>2017-01-23 11:02:22 +0000
committerSean Owen <sowen@cloudera.com>2017-01-23 11:02:22 +0000
commitc99492141b1ddddb8edb6841a6e83748e5ba9bba (patch)
tree494ac6360097d0da4fe12a96a73d274dd2ce1800 /core
parentc4a6519c44f29950ef3d706a4f79e006ec8bc6b5 (diff)
downloadspark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.tar.gz
spark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.tar.bz2
spark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.zip
[SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks
## What changes were proposed in this pull request? Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function. ## How was this patch tested? Jenkins Author: Yuming Wang <wgyumg@gmail.com> Closes #16527 from wangyum/SPARK-19146.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala30
2 files changed, 41 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 83dc5d8745..e87caff426 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
- val toRemove = (stages.size - retainedStages)
+ val toRemove = calculateNumberToRemove(stages.size, retainedStages)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
@@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
- val toRemove = (jobs.size - retainedJobs)
+ val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
jobs.take(toRemove).foreach { job =>
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
@@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// If Tasks is too large, remove and garbage collect old tasks
if (stageData.taskData.size > retainedTasks) {
- stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
+ stageData.taskData = stageData.taskData.drop(
+ calculateNumberToRemove(stageData.taskData.size, retainedTasks))
}
for (
@@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
/**
+ * Remove at least (maxRetained / 10) items to reduce friction.
+ */
+ private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = {
+ math.max(retainedSize / 10, dataSize - retainedSize)
+ }
+
+ /**
* Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage
* aggregate metrics by calculating deltas between the currently recorded metrics and the new
* metrics.
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index da853f1be8..e3127da9a6 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
}
+
+ test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedTasks", "100")
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.mergeShuffleReadMetrics()
+ val task = new ShuffleMapTask(0)
+ val taskType = Utils.getFormattedClassName(task)
+
+ val listener1 = new JobProgressListener(conf)
+ for (t <- 1 to 101) {
+ val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+ taskInfo.finishTime = 1
+ listener1.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ }
+ // 101 - math.max(100 / 10, 101 - 100) = 91
+ assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91)
+
+ val listener2 = new JobProgressListener(conf)
+ for (t <- 1 to 150) {
+ val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+ taskInfo.finishTime = 1
+ listener2.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ }
+ // 150 - math.max(100 / 10, 150 - 100) = 100
+ assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100)
+ }
+
}