diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d8745..e87caff426 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = (stages.size - retainedStages) + val toRemove = calculateNumberToRemove(stages.size, retainedStages) stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) @@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If jobs is too large, remove and garbage collect old jobs */ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { if (jobs.size > retainedJobs) { - val toRemove = (jobs.size - retainedJobs) + val toRemove = calculateNumberToRemove(jobs.size, retainedJobs) jobs.take(toRemove).foreach { job => // Remove the job's UI data, if it exists jobIdToData.remove(job.jobId).foreach { removedJob => @@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If Tasks is too large, remove and garbage collect old tasks if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks) + stageData.taskData = stageData.taskData.drop( + calculateNumberToRemove(stageData.taskData.size, retainedTasks)) } for ( @@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } /** + * Remove at least (maxRetained / 10) items to reduce friction. + */ + private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = { + math.max(retainedSize / 10, dataSize - retainedSize) + } + + /** * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage * aggregate metrics by calculating deltas between the currently recorded metrics and the new * metrics. |