diff options
author | Yuming Wang <wgyumg@gmail.com> | 2017-01-23 11:02:22 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-01-23 11:02:22 +0000 |
commit | c99492141b1ddddb8edb6841a6e83748e5ba9bba (patch) | |
tree | 494ac6360097d0da4fe12a96a73d274dd2ce1800 /core/src | |
parent | c4a6519c44f29950ef3d706a4f79e006ec8bc6b5 (diff) | |
download | spark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.tar.gz spark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.tar.bz2 spark-c99492141b1ddddb8edb6841a6e83748e5ba9bba.zip |
[SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks
## What changes were proposed in this pull request?
Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.
## How was this patch tested?
Jenkins
Author: Yuming Wang <wgyumg@gmail.com>
Closes #16527 from wangyum/SPARK-19146.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 14 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 30 |
2 files changed, 41 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 83dc5d8745..e87caff426 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If stages is too large, remove and garbage collect old stages */ private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { - val toRemove = (stages.size - retainedStages) + val toRemove = calculateNumberToRemove(stages.size, retainedStages) stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) stageIdToInfo.remove(s.stageId) @@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { /** If jobs is too large, remove and garbage collect old jobs */ private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized { if (jobs.size > retainedJobs) { - val toRemove = (jobs.size - retainedJobs) + val toRemove = calculateNumberToRemove(jobs.size, retainedJobs) jobs.take(toRemove).foreach { job => // Remove the job's UI data, if it exists jobIdToData.remove(job.jobId).foreach { removedJob => @@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // If Tasks is too large, remove and garbage collect old tasks if (stageData.taskData.size > retainedTasks) { - stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks) + stageData.taskData = stageData.taskData.drop( + calculateNumberToRemove(stageData.taskData.size, retainedTasks)) } for ( @@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } /** + * Remove at least (maxRetained / 10) items to reduce friction. + */ + private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = { + math.max(retainedSize / 10, dataSize - retainedSize) + } + + /** * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage * aggregate metrics by calculating deltas between the currently recorded metrics and the new * metrics. diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index da853f1be8..e3127da9a6 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo) assert(newTaskInfo.accumulables === Seq(userAccum)) } + + test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") { + val conf = new SparkConf() + conf.set("spark.ui.retainedTasks", "100") + val taskMetrics = TaskMetrics.empty + taskMetrics.mergeShuffleReadMetrics() + val task = new ShuffleMapTask(0) + val taskType = Utils.getFormattedClassName(task) + + val listener1 = new JobProgressListener(conf) + for (t <- 1 to 101) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener1.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 101 - math.max(100 / 10, 101 - 100) = 91 + assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91) + + val listener2 = new JobProgressListener(conf) + for (t <- 1 to 150) { + val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) + taskInfo.finishTime = 1 + listener2.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + } + // 150 - math.max(100 / 10, 150 - 100) = 100 + assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100) + } + } |